Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Action dependency injection #466

Merged
merged 9 commits into from
Feb 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions PLUGIN_MIGRATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,9 @@ The [Java Client for OpenSearch](https://github.com/opensearch-project/opensearc
The `SDKRestClient` provides wrapper methods matching the `Client` API (but not implementing it), implemented internally with the (soon to be deprecated) `RestHighLevelClient`. While this speeds migration efforts, it should be considered a temporary "bridge" with follow up migration efforts to the `OpenSearchClient` planned.
- While the class names and method parameters are the same, the `Request` and `Response` classes are often in different packages. In most cases, other than changing `import` statements, no additional code changes are required. In a few cases, there are minor changes required to interface with the new response class API.

The `client.execute(action, request, responseListener)` method is not yet implemented. Instead:
- Instantiate an instance of the corresponding transport action
- Pass the `request` and `responseListener` to the action's `doExecute()` method.
The `client.execute(action, request, responseListener)` method is implemented on the SDKClient.

Remove the transport action inheritance from HandledTransportAction. This may change to direct inheritance of `TransportAction` and implementation of `execute()`.
Change the transport action inheritance from HandledTransportAction to directly inherit from `TransportAction`.

### Replace RestHandler with ExtensionRestHandler

Expand Down
15 changes: 0 additions & 15 deletions src/main/java/org/opensearch/sdk/Extension.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,7 @@
import java.util.Collections;
import java.util.Collection;
import java.util.List;
import java.util.Map;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionResponse;
import org.opensearch.action.support.TransportAction;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.threadpool.ExecutorBuilder;
Expand Down Expand Up @@ -63,17 +59,6 @@ default Collection<Object> createComponents() {
return Collections.emptyList();
}

/**
* Gets an optional list of custom {@link TransportAction} for the extension to register with OpenSearch.
* <p>
* TODO: ActionExtension#getActions will replace this: https://github.com/opensearch-project/opensearch-sdk-java/issues/368
*
* @return a list of custom transport actions this extension uses.
*/
default Map<String, Class<? extends TransportAction<? extends ActionRequest, ? extends ActionResponse>>> getActionsMap() {
return Collections.emptyMap();
}

/**
* Provides the list of this Extension's custom thread pools, empty if
* none.
Expand Down
57 changes: 41 additions & 16 deletions src/main/java/org/opensearch/sdk/ExtensionsRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionResponse;
import org.opensearch.action.support.TransportAction;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.extensions.rest.ExtensionRestRequest;
Expand All @@ -31,13 +34,15 @@
import org.opensearch.sdk.handlers.ClusterSettingsResponseHandler;
import org.opensearch.sdk.handlers.ClusterStateResponseHandler;
import org.opensearch.sdk.handlers.EnvironmentSettingsResponseHandler;
import org.opensearch.sdk.action.SDKActionModule;
import org.opensearch.sdk.handlers.AcknowledgedResponseHandler;
import org.opensearch.sdk.handlers.ExtensionDependencyResponseHandler;
import org.opensearch.sdk.handlers.ExtensionsIndicesModuleNameRequestHandler;
import org.opensearch.sdk.handlers.ExtensionsIndicesModuleRequestHandler;
import org.opensearch.sdk.handlers.ExtensionsInitRequestHandler;
import org.opensearch.sdk.handlers.ExtensionsRestRequestHandler;
import org.opensearch.sdk.handlers.UpdateSettingsRequestHandler;
import org.opensearch.tasks.TaskManager;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportResponse;
import org.opensearch.transport.TransportResponseHandler;
Expand All @@ -50,6 +55,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -108,6 +114,10 @@ public class ExtensionsRunner {
* A thread pool for the extension.
*/
private final ThreadPool threadPool;
/**
* A task manager for the extension
*/
private final TaskManager taskManager;
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved

private ExtensionNamedXContentRegistry extensionNamedXContentRegistry = new ExtensionNamedXContentRegistry(
Settings.EMPTY,
Expand All @@ -119,14 +129,10 @@ public class ExtensionsRunner {
new ExtensionsIndicesModuleNameRequestHandler();
private ExtensionsRestRequestHandler extensionsRestRequestHandler = new ExtensionsRestRequestHandler(extensionRestPathRegistry);

/*
* TODO: expose an interface for extension to register actions
* https://github.com/opensearch-project/opensearch-sdk-java/issues/119
*/
/**
* Instantiates a new transportActions
*/
public TransportActions transportActions;
public final TransportActions transportActions;

/**
* Instantiates a new update settings request handler
Expand All @@ -148,20 +154,45 @@ protected ExtensionsRunner(Extension extension) throws IOException {
.put(TransportSettings.PORT.getKey(), extensionSettings.getHostPort())
.build();
this.threadPool = new ThreadPool(settings);
this.taskManager = new TaskManager(settings, threadPool, Collections.emptySet());

Guice.createInjector(b -> {
// TODO: Move this map instantiation inside TransportActions and provide register API
// to move logic from the module stream
// https://github.com/opensearch-project/opensearch-sdk-java/issues/467
Map<String, Class<? extends TransportAction<? extends ActionRequest, ? extends ActionResponse>>> transportActionsMap =
new HashMap<>();

List<com.google.inject.Module> modules = new ArrayList<>();
// Base bindings
modules.add(b -> {
b.bind(ExtensionsRunner.class).toInstance(this);
b.bind(Extension.class).toInstance(extension);

// FIXME: Change this to a provider interface
// https://github.com/opensearch-project/opensearch-sdk-java/issues/447
b.bind(NamedXContentRegistry.class).toInstance(getNamedXContentRegistry().getRegistry());
b.bind(ThreadPool.class).toInstance(getThreadPool());
b.bind(TaskManager.class).toInstance(taskManager);

b.bind(SDKClient.class).toInstance(new SDKClient());
b.bind(SDKClient.class);
b.bind(SDKClusterService.class).toInstance(new SDKClusterService(this));

// create components
injectComponents(b);
});
// Bind the return values from create components
modules.add(this::injectComponents);
// Bind actions from getActions
if (extension instanceof ActionExtension) {
SDKActionModule sdkActionModule = new SDKActionModule((ActionExtension) extension);
modules.add(sdkActionModule);
// save custom transport actions
sdkActionModule.getActions()
.entrySet()
.stream()
.forEach(h -> { transportActionsMap.put(h.getKey(), h.getValue().getTransportAction()); });
}
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved

Guice.createInjector(modules);

this.transportActions = new TransportActions(transportActionsMap);

if (extension instanceof ActionExtension) {
// store REST handlers in the registry
Expand All @@ -170,17 +201,11 @@ protected ExtensionsRunner(Extension extension) throws IOException {
extensionRestPathRegistry.registerHandler(route.getMethod(), route.getPath(), extensionRestHandler);
}
}
// TODO new getActions code will go here
}
// save custom settings
this.customSettings = extension.getSettings();
// save custom namedXContent
this.customNamedXContent = extension.getNamedXContent();
// save custom transport actions
this.transportActions = new TransportActions(extension.getActionsMap());

// TODO: put getactions in a MapBinder
// Requires https://github.com/opensearch-project/opensearch-sdk-java/pull/434
}

@SuppressWarnings({ "unchecked", "rawtypes" })
Expand Down
45 changes: 42 additions & 3 deletions src/main/java/org/opensearch/sdk/SDKClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.Map;

import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.google.inject.Inject;
import com.fasterxml.jackson.datatype.guava.GuavaModule;
import org.apache.hc.core5.function.Factory;
import org.apache.hc.core5.http.HttpHost;
Expand All @@ -28,6 +30,9 @@
import org.apache.hc.core5.reactor.ssl.TlsDetails;
import org.apache.hc.core5.ssl.SSLContextBuilder;
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionResponse;
import org.opensearch.action.ActionType;
import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.delete.DeleteRequest;
Expand All @@ -40,6 +45,7 @@
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.TransportAction;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Cancellable;
import org.opensearch.client.Client;
Expand Down Expand Up @@ -74,6 +80,16 @@ public class SDKClient implements Closeable {
private RestClient restClient;
private RestHighLevelClient sdkRestClient;

// Used by client.execute
@SuppressWarnings("rawtypes")
@Inject
private Map<ActionType, TransportAction> actions;

/**
* Instantiate this client.
*/
public SDKClient() {}

/**
* Create and configure a RestClientBuilder
*
Expand Down Expand Up @@ -126,7 +142,7 @@ public OpenSearchClient initializeJavaClient(ExtensionSettings settings) {
* Initializes an OpenSearchClient using OpenSearch JavaClient
*
* @param hostAddress The address of OpenSearch cluster, client can connect to
* @param port The port of OpenSearch cluster
* @param port The port of OpenSearch cluster
* @return The SDKClient implementation of OpenSearchClient. The user is responsible for calling
* {@link #doCloseJavaClient()} when finished with the client
*/
Expand Down Expand Up @@ -215,6 +231,28 @@ public void close() throws IOException {
doCloseHighLevelClient();
}

/**
* Executes a Transport Action
*
* @param <Request> The Request type for the action
* @param <Response> The Response type for the action
* @param action The registered action
* @param request The Request
* @param listener An action listener for the Response
*/
public final <Request extends ActionRequest, Response extends ActionResponse> void execute(
ActionType<Response> action,
Request request,
ActionListener<Response> listener
) {
@SuppressWarnings("unchecked")
TransportAction<Request, Response> transportAction = actions.get(action);
if (transportAction == null) {
throw new IllegalStateException("failed to find action [" + action + "] to execute");
}
transportAction.execute(request, listener);
}

/**
* Wraps an internal {@link RestHighLevelClient} using method signatures expected by {@link Client} and {@link org.opensearch.client.AdminClient} syntax, providing a drop-in replacement in existing plugins with a minimum of code changes.
* <p>
Expand Down Expand Up @@ -286,6 +324,7 @@ public void get(GetRequest request, ActionListener<GetResponse> listener) {

/**
* Gets all the documents that match the criteria
*
* @param request The multiGet Request
* @param listener A listener to be notified with a result
*/
Expand All @@ -296,7 +335,7 @@ public void multiGet(MultiGetRequest request, ActionListener<MultiGetResponse> l
/**
* Deletes a document from the index based on the index, and id.
*
* @param request The delete request
* @param request The delete request
* @param listener A listener to be notified with a result
* @see Requests#deleteRequest(String)
*/
Expand All @@ -307,7 +346,7 @@ public void delete(DeleteRequest request, ActionListener<DeleteResponse> listene
/**
* Search across one or more indices with a query.
*
* @param request The search request
* @param request The search request
* @param listener A listener to be notified of the result
* @see Requests#searchRequest(String...)
*/
Expand Down
95 changes: 95 additions & 0 deletions src/main/java/org/opensearch/sdk/action/SDKActionModule.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.sdk.action;

import java.util.Map;
import java.util.stream.Collectors;

import org.opensearch.action.ActionType;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.TransportAction;
import org.opensearch.common.NamedRegistry;
import org.opensearch.sdk.ActionExtension.ActionHandler;

import com.google.inject.AbstractModule;
import com.google.inject.multibindings.MapBinder;

import org.opensearch.sdk.ActionExtension;

import static java.util.Collections.unmodifiableMap;

/**
* A module for injecting getActions classes into Guice.
*/
public class SDKActionModule extends AbstractModule {

private final Map<String, ActionHandler<?, ?>> actions;
private final ActionFilters actionFilters;

/**
* Instantiate this module
*
* @param extension An instance of {@link ActionExtension}.
*/
public SDKActionModule(ActionExtension extension) {
this.actions = setupActions(extension);
this.actionFilters = setupActionFilters(extension);
// TODO: consider moving Rest Handler registration here
}

public Map<String, ActionHandler<?, ?>> getActions() {
return actions;
}

public ActionFilters getActionFilters() {
return actionFilters;
}

private static Map<String, ActionHandler<?, ?>> setupActions(ActionExtension extension) {
// Subclass NamedRegistry for easy registration
class ActionRegistry extends NamedRegistry<ActionHandler<?, ?>> {
ActionRegistry() {
super("action");
}

public void register(ActionHandler<?, ?> handler) {
register(handler.getAction().name(), handler);
}
}
ActionRegistry actions = new ActionRegistry();
// Register getActions in it
extension.getActions().stream().forEach(actions::register);

return unmodifiableMap(actions.getRegistry());
}

private static ActionFilters setupActionFilters(ActionExtension extension) {
return new ActionFilters(extension.getActionFilters().stream().collect(Collectors.toSet()));
}

@Override
protected void configure() {
// Bind action filters
bind(ActionFilters.class).toInstance(actionFilters);
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved

// bind ActionType -> transportAction Map used by Client
@SuppressWarnings("rawtypes")
MapBinder<ActionType, TransportAction> transportActionsBinder = MapBinder.newMapBinder(
binder(),
ActionType.class,
TransportAction.class
);
for (ActionHandler<?, ?> action : actions.values()) {
// bind the action as eager singleton, so the map binder one will reuse it
bind(action.getTransportAction()).asEagerSingleton();
transportActionsBinder.addBinding(action.getAction()).to(action.getTransportAction()).asEagerSingleton();
}
}
}
Loading