Skip to content

Commit

Permalink
Adding support to register Transport Actions (#115)
Browse files Browse the repository at this point in the history
* Adding support for registering Transport Actions with OpenSearch

Signed-off-by: Sarat Vemulapalli <[email protected]>

* Adding support for registering Transport Actions with OpenSearch

Signed-off-by: Sarat Vemulapalli <[email protected]>

* Fixing few typos

Signed-off-by: Sarat Vemulapalli <[email protected]>

* Addressing comments

Signed-off-by: Sarat Vemulapalli <[email protected]>

* Removed api package

Signed-off-by: Sarat Vemulapalli <[email protected]>

* Removing lingering API

Signed-off-by: Sarat Vemulapalli <[email protected]>

Signed-off-by: Sarat Vemulapalli <[email protected]>
  • Loading branch information
saratvemulapalli authored Sep 1, 2022
1 parent 694872a commit 10ea5a9
Show file tree
Hide file tree
Showing 4 changed files with 205 additions and 0 deletions.
6 changes: 6 additions & 0 deletions src/main/java/org/opensearch/sdk/ExtensionsRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ public class ExtensionsRunner {
private final TransportInterceptor NOOP_TRANSPORT_INTERCEPTOR = new TransportInterceptor() {
};
private NamedWriteableRegistryAPI namedWriteableRegistryApi = new NamedWriteableRegistryAPI();
/*
* TODO: expose an interface for extension to register actions
* https://github.com/opensearch-project/opensearch-sdk-java/issues/119
*/
private TransportActions transportActions = new TransportActions(new HashMap<>());

/**
* Instantiates a new Extensions Runner using test settings.
Expand Down Expand Up @@ -169,6 +174,7 @@ InitializeExtensionsResponse handleExtensionInitRequest(InitializeExtensionsRequ
setOpensearchNode(opensearchNode);
extensionTransportService.connectToNode(opensearchNode);
sendRegisterRestActionsRequest(extensionTransportService);
transportActions.sendRegisterTransportActionsRequest(extensionTransportService, opensearchNode);
}
}

Expand Down
61 changes: 61 additions & 0 deletions src/main/java/org/opensearch/sdk/TransportActions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.sdk;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionResponse;
import org.opensearch.action.support.TransportAction;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.extensions.ExtensionsOrchestrator;
import org.opensearch.extensions.RegisterTransportActionsRequest;
import org.opensearch.sdk.handlers.ExtensionResponseHandler;
import org.opensearch.transport.TransportService;

import java.util.HashMap;
import java.util.Map;

/**
* This class helps manage transport actions for SDK
*/
public class TransportActions {
private final Logger logger = LogManager.getLogger(TransportActions.class);
private Map<String, Class> transportActions;

/**
* Constructor for TransportActions. Creates a map of transportActions for this extension.
* @param transportActions is the list of actions the extension would like to register with OpenSearch.
*/
public <Request extends ActionRequest, Response extends ActionResponse> TransportActions(
Map<String, Class<? extends TransportAction<Request, Response>>> transportActions
) {
this.transportActions = new HashMap(transportActions);
}

/**
* Requests that OpenSearch register the Transport Actions for this extension.
*
* @param transportService The TransportService defining the connection to OpenSearch.
* @param opensearchNode The OpenSearch node where transport actions being registered.
*/
public void sendRegisterTransportActionsRequest(TransportService transportService, DiscoveryNode opensearchNode) {
logger.info("Sending Register Transport Actions request to OpenSearch");
ExtensionResponseHandler registerTransportActionsResponseHandler = new ExtensionResponseHandler();
try {
transportService.sendRequest(
opensearchNode,
ExtensionsOrchestrator.REQUEST_EXTENSION_REGISTER_TRANSPORT_ACTIONS,
new RegisterTransportActionsRequest(transportActions),
registerTransportActionsResponseHandler
);
} catch (Exception e) {
logger.info("Failed to send Register Transport Actions request to OpenSearch", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.sdk.handlers;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.extensions.ExtensionBooleanResponse;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponseHandler;

import java.io.IOException;

/**
* This class handles the response {{@link org.opensearch.extensions.ExtensionBooleanResponse }} from OpenSearch to Extension.
*/
public class ExtensionResponseHandler implements TransportResponseHandler<ExtensionBooleanResponse> {
private static final Logger logger = LogManager.getLogger(ExtensionResponseHandler.class);

@Override
public void handleResponse(ExtensionBooleanResponse response) {
logger.info("received {}", response);
}

@Override
public void handleException(TransportException exp) {
logger.info("Extension Request failed", exp);
}

@Override
public String executor() {
return ThreadPool.Names.GENERIC;
}

@Override
public ExtensionBooleanResponse read(StreamInput in) throws IOException {
return new ExtensionBooleanResponse(in);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package org.opensearch.sdk;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.opensearch.Version;
import org.opensearch.action.ActionListener;
import org.opensearch.action.main.MainRequest;
import org.opensearch.action.main.MainResponse;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.transport.TransportAddress;
import org.opensearch.extensions.ExtensionsOrchestrator;
import org.opensearch.sdk.handlers.ExtensionResponseHandler;
import org.opensearch.tasks.Task;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportService;

import java.net.InetAddress;
import java.util.Collections;
import java.util.Map;

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

public class TestExtensionTransportActionsAPI extends OpenSearchTestCase {

private TransportService transportService;
private TransportActions transportActions;
private DiscoveryNode opensearchNode;

private class TestTransportAction extends HandledTransportAction<MainRequest, MainResponse> {

protected TestTransportAction(
String actionName,
TransportService transportService,
ActionFilters actionFilters,
Writeable.Reader<MainRequest> mainRequestReader
) {
super(actionName, transportService, actionFilters, mainRequestReader);
}

@Override
protected void doExecute(Task task, MainRequest request, ActionListener<MainResponse> actionListener) {

}
}

@Override
@BeforeEach
public void setUp() throws Exception {
super.setUp();
this.transportActions = new TransportActions(Map.of("testAction", TestTransportAction.class));
this.transportService = spy(
new TransportService(
Settings.EMPTY,
mock(Transport.class),
null,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
x -> null,
null,
Collections.emptySet()
)
);
this.opensearchNode = new DiscoveryNode(
"test_node",
new TransportAddress(InetAddress.getByName("localhost"), 9876),
emptyMap(),
emptySet(),
Version.CURRENT
);
}

@Test
public void testRegisterTransportAction() {
transportActions.sendRegisterTransportActionsRequest(transportService, opensearchNode);
verify(transportService, times(1)).sendRequest(
any(),
eq(ExtensionsOrchestrator.REQUEST_EXTENSION_REGISTER_TRANSPORT_ACTIONS),
any(),
any(ExtensionResponseHandler.class)
);
}
}

0 comments on commit 10ea5a9

Please sign in to comment.