Skip to content

Commit

Permalink
implement transport api for PPL inter-plugin communication (#533) (#778)
Browse files Browse the repository at this point in the history
Signed-off-by: Zhongnan Su <[email protected]>

Signed-off-by: Zhongnan Su <[email protected]>
Co-authored-by: Zhongnan Su <[email protected]>
  • Loading branch information
penghuo and zhongnansu authored Aug 18, 2022
1 parent 0576d96 commit f14c763
Show file tree
Hide file tree
Showing 9 changed files with 602 additions and 201 deletions.
16 changes: 15 additions & 1 deletion plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

plugins {
id 'java'
id "io.freefair.lombok"
id 'jacoco'
id 'opensearch.opensearchplugin'
}
Expand Down Expand Up @@ -72,7 +73,6 @@ publishing {
}

javadoc.enabled = false
test.enabled = false
loggerUsageCheck.enabled = false
dependencyLicenses.enabled = false
thirdPartyAudit.enabled = false
Expand All @@ -89,6 +89,13 @@ configurations.all {
resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:${jackson_version}"
resolutionStrategy.force "com.fasterxml.jackson.core:jackson-databind:${jackson_databind_version}"
}
compileJava {
options.compilerArgs.addAll(["-processor", 'lombok.launch.AnnotationProcessorHider$AnnotationProcessor'])
}

compileTestJava {
options.compilerArgs.addAll(["-processor", 'lombok.launch.AnnotationProcessorHider$AnnotationProcessor'])
}

dependencies {
api group: 'org.springframework', name: 'spring-beans', version: "${spring_version}"
Expand All @@ -97,6 +104,13 @@ dependencies {
api project(':opensearch')
}

test {
include '**/*Test.class'
testLogging {
events "passed", "skipped", "failed"
exceptionFormat "full"
}
}

ext {
projectSubstitutions = [:]
Expand Down
83 changes: 51 additions & 32 deletions plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
* SPDX-License-Identifier: Apache-2.0
*/


package org.opensearch.sql.plugin;

import com.google.common.collect.ImmutableList;
Expand All @@ -13,6 +12,9 @@
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionResponse;
import org.opensearch.action.ActionType;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNodes;
Expand Down Expand Up @@ -48,6 +50,9 @@
import org.opensearch.sql.plugin.rest.RestPPLQueryAction;
import org.opensearch.sql.plugin.rest.RestPPLStatsAction;
import org.opensearch.sql.plugin.rest.RestQuerySettingsAction;
import org.opensearch.sql.plugin.transport.PPLQueryAction;
import org.opensearch.sql.plugin.transport.TransportPPLQueryAction;
import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.FixedExecutorBuilder;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -57,9 +62,7 @@ public class SQLPlugin extends Plugin implements ActionPlugin, ScriptPlugin {

private ClusterService clusterService;

/**
* Settings should be inited when bootstrap the plugin.
*/
/** Settings should be inited when bootstrap the plugin. */
private org.opensearch.sql.common.setting.Settings pluginSettings;

public String name() {
Expand All @@ -71,49 +74,68 @@ public String description() {
}

@Override
public List<RestHandler> getRestHandlers(Settings settings, RestController restController,
ClusterSettings clusterSettings,
IndexScopedSettings indexScopedSettings,
SettingsFilter settingsFilter,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster) {
public List<RestHandler> getRestHandlers(
Settings settings,
RestController restController,
ClusterSettings clusterSettings,
IndexScopedSettings indexScopedSettings,
SettingsFilter settingsFilter,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster) {
Objects.requireNonNull(clusterService, "Cluster service is required");
Objects.requireNonNull(pluginSettings, "Cluster settings is required");

LocalClusterState.state().setResolver(indexNameExpressionResolver);
Metrics.getInstance().registerDefaultMetrics();

return Arrays.asList(
new RestPPLQueryAction(restController, clusterService, pluginSettings, settings),
new RestPPLQueryAction(pluginSettings, settings),
new RestSqlAction(settings, clusterService, pluginSettings),
new RestSqlStatsAction(settings, restController),
new RestPPLStatsAction(settings, restController),
new RestQuerySettingsAction(settings, restController)
);
new RestQuerySettingsAction(settings, restController));
}

/** Register action and handler so that transportClient can find proxy for action. */
@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return Arrays.asList(
new ActionHandler<>(
new ActionType<>(PPLQueryAction.NAME, TransportPPLQueryResponse::new),
TransportPPLQueryAction.class));
}

@Override
public Collection<Object> createComponents(Client client, ClusterService clusterService,
ThreadPool threadPool,
ResourceWatcherService resourceWatcherService,
ScriptService scriptService,
NamedXContentRegistry contentRegistry,
Environment environment,
NodeEnvironment nodeEnvironment,
NamedWriteableRegistry namedWriteableRegistry,
IndexNameExpressionResolver indexNameResolver,
Supplier<RepositoriesService>
repositoriesServiceSupplier) {
public Collection<Object> createComponents(
Client client,
ClusterService clusterService,
ThreadPool threadPool,
ResourceWatcherService resourceWatcherService,
ScriptService scriptService,
NamedXContentRegistry contentRegistry,
Environment environment,
NodeEnvironment nodeEnvironment,
NamedWriteableRegistry namedWriteableRegistry,
IndexNameExpressionResolver indexNameResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier) {
this.clusterService = clusterService;
this.pluginSettings = new OpenSearchSettings(clusterService.getClusterSettings());

LocalClusterState.state().setClusterService(clusterService);
LocalClusterState.state().setPluginSettings((OpenSearchSettings) pluginSettings);

return super
.createComponents(client, clusterService, threadPool, resourceWatcherService, scriptService,
contentRegistry, environment, nodeEnvironment, namedWriteableRegistry,
indexNameResolver, repositoriesServiceSupplier);
return super.createComponents(
client,
clusterService,
threadPool,
resourceWatcherService,
scriptService,
contentRegistry,
environment,
nodeEnvironment,
namedWriteableRegistry,
indexNameResolver,
repositoriesServiceSupplier);
}

@Override
Expand All @@ -124,9 +146,7 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
AsyncRestExecutor.SQL_WORKER_THREAD_POOL_NAME,
OpenSearchExecutors.allocatedProcessors(settings),
1000,
null
)
);
null));
}

@Override
Expand All @@ -141,5 +161,4 @@ public List<Setting<?>> getSettings() {
public ScriptEngine getScriptEngine(Settings settings, Collection<ScriptContext<?>> contexts) {
return new ExpressionScriptEngine(new DefaultExpressionSerializer());
}

}
Loading

0 comments on commit f14c763

Please sign in to comment.