From f14c763634d7b52cd9fc284846d852d2445dd219 Mon Sep 17 00:00:00 2001 From: Peng Huo Date: Thu, 18 Aug 2022 11:02:26 -0700 Subject: [PATCH] implement transport api for PPL inter-plugin communication (#533) (#778) Signed-off-by: Zhongnan Su Signed-off-by: Zhongnan Su Co-authored-by: Zhongnan Su --- plugin/build.gradle | 16 +- .../org/opensearch/sql/plugin/SQLPlugin.java | 83 +++--- .../sql/plugin/rest/RestPPLQueryAction.java | 236 +++++------------- .../sql/plugin/transport/PPLQueryAction.java | 18 ++ .../transport/TransportPPLQueryAction.java | 182 ++++++++++++++ .../transport/TransportPPLQueryRequest.java | 136 ++++++++++ .../transport/TransportPPLQueryResponse.java | 28 +++ .../TransportPPLQueryRequestTest.java | 101 ++++++++ .../sql/ppl/domain/PPLQueryRequest.java | 3 + 9 files changed, 602 insertions(+), 201 deletions(-) create mode 100644 plugin/src/main/java/org/opensearch/sql/plugin/transport/PPLQueryAction.java create mode 100644 plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java create mode 100644 plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryRequest.java create mode 100644 plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryResponse.java create mode 100644 plugin/src/test/java/org/opensearch/sql/plugin/transport/TransportPPLQueryRequestTest.java diff --git a/plugin/build.gradle b/plugin/build.gradle index 952cb7cc97..78e08d7d0d 100644 --- a/plugin/build.gradle +++ b/plugin/build.gradle @@ -24,6 +24,7 @@ plugins { id 'java' + id "io.freefair.lombok" id 'jacoco' id 'opensearch.opensearchplugin' } @@ -72,7 +73,6 @@ publishing { } javadoc.enabled = false -test.enabled = false loggerUsageCheck.enabled = false dependencyLicenses.enabled = false thirdPartyAudit.enabled = false @@ -89,6 +89,13 @@ configurations.all { resolutionStrategy.force "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:${jackson_version}" resolutionStrategy.force "com.fasterxml.jackson.core:jackson-databind:${jackson_databind_version}" } +compileJava { + options.compilerArgs.addAll(["-processor", 'lombok.launch.AnnotationProcessorHider$AnnotationProcessor']) +} + +compileTestJava { + options.compilerArgs.addAll(["-processor", 'lombok.launch.AnnotationProcessorHider$AnnotationProcessor']) +} dependencies { api group: 'org.springframework', name: 'spring-beans', version: "${spring_version}" @@ -97,6 +104,13 @@ dependencies { api project(':opensearch') } +test { + include '**/*Test.class' + testLogging { + events "passed", "skipped", "failed" + exceptionFormat "full" + } +} ext { projectSubstitutions = [:] diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index 1f6bd8e1ed..a4a03fde11 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -3,7 +3,6 @@ * SPDX-License-Identifier: Apache-2.0 */ - package org.opensearch.sql.plugin; import com.google.common.collect.ImmutableList; @@ -13,6 +12,9 @@ import java.util.List; import java.util.Objects; import java.util.function.Supplier; +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionResponse; +import org.opensearch.action.ActionType; import org.opensearch.client.Client; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.node.DiscoveryNodes; @@ -48,6 +50,9 @@ import org.opensearch.sql.plugin.rest.RestPPLQueryAction; import org.opensearch.sql.plugin.rest.RestPPLStatsAction; import org.opensearch.sql.plugin.rest.RestQuerySettingsAction; +import org.opensearch.sql.plugin.transport.PPLQueryAction; +import org.opensearch.sql.plugin.transport.TransportPPLQueryAction; +import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse; import org.opensearch.threadpool.ExecutorBuilder; import org.opensearch.threadpool.FixedExecutorBuilder; import org.opensearch.threadpool.ThreadPool; @@ -57,9 +62,7 @@ public class SQLPlugin extends Plugin implements ActionPlugin, ScriptPlugin { private ClusterService clusterService; - /** - * Settings should be inited when bootstrap the plugin. - */ + /** Settings should be inited when bootstrap the plugin. */ private org.opensearch.sql.common.setting.Settings pluginSettings; public String name() { @@ -71,12 +74,14 @@ public String description() { } @Override - public List getRestHandlers(Settings settings, RestController restController, - ClusterSettings clusterSettings, - IndexScopedSettings indexScopedSettings, - SettingsFilter settingsFilter, - IndexNameExpressionResolver indexNameExpressionResolver, - Supplier nodesInCluster) { + public List getRestHandlers( + Settings settings, + RestController restController, + ClusterSettings clusterSettings, + IndexScopedSettings indexScopedSettings, + SettingsFilter settingsFilter, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier nodesInCluster) { Objects.requireNonNull(clusterService, "Cluster service is required"); Objects.requireNonNull(pluginSettings, "Cluster settings is required"); @@ -84,36 +89,53 @@ public List getRestHandlers(Settings settings, RestController restC Metrics.getInstance().registerDefaultMetrics(); return Arrays.asList( - new RestPPLQueryAction(restController, clusterService, pluginSettings, settings), + new RestPPLQueryAction(pluginSettings, settings), new RestSqlAction(settings, clusterService, pluginSettings), new RestSqlStatsAction(settings, restController), new RestPPLStatsAction(settings, restController), - new RestQuerySettingsAction(settings, restController) - ); + new RestQuerySettingsAction(settings, restController)); + } + + /** Register action and handler so that transportClient can find proxy for action. */ + @Override + public List> getActions() { + return Arrays.asList( + new ActionHandler<>( + new ActionType<>(PPLQueryAction.NAME, TransportPPLQueryResponse::new), + TransportPPLQueryAction.class)); } @Override - public Collection createComponents(Client client, ClusterService clusterService, - ThreadPool threadPool, - ResourceWatcherService resourceWatcherService, - ScriptService scriptService, - NamedXContentRegistry contentRegistry, - Environment environment, - NodeEnvironment nodeEnvironment, - NamedWriteableRegistry namedWriteableRegistry, - IndexNameExpressionResolver indexNameResolver, - Supplier - repositoriesServiceSupplier) { + public Collection createComponents( + Client client, + ClusterService clusterService, + ThreadPool threadPool, + ResourceWatcherService resourceWatcherService, + ScriptService scriptService, + NamedXContentRegistry contentRegistry, + Environment environment, + NodeEnvironment nodeEnvironment, + NamedWriteableRegistry namedWriteableRegistry, + IndexNameExpressionResolver indexNameResolver, + Supplier repositoriesServiceSupplier) { this.clusterService = clusterService; this.pluginSettings = new OpenSearchSettings(clusterService.getClusterSettings()); LocalClusterState.state().setClusterService(clusterService); LocalClusterState.state().setPluginSettings((OpenSearchSettings) pluginSettings); - return super - .createComponents(client, clusterService, threadPool, resourceWatcherService, scriptService, - contentRegistry, environment, nodeEnvironment, namedWriteableRegistry, - indexNameResolver, repositoriesServiceSupplier); + return super.createComponents( + client, + clusterService, + threadPool, + resourceWatcherService, + scriptService, + contentRegistry, + environment, + nodeEnvironment, + namedWriteableRegistry, + indexNameResolver, + repositoriesServiceSupplier); } @Override @@ -124,9 +146,7 @@ public List> getExecutorBuilders(Settings settings) { AsyncRestExecutor.SQL_WORKER_THREAD_POOL_NAME, OpenSearchExecutors.allocatedProcessors(settings), 1000, - null - ) - ); + null)); } @Override @@ -141,5 +161,4 @@ public List> getSettings() { public ScriptEngine getScriptEngine(Settings settings, Collection> contexts) { return new ExpressionScriptEngine(new DefaultExpressionSerializer()); } - } diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java index f97baafa00..270113dc8a 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestPPLQueryAction.java @@ -3,7 +3,6 @@ * SPDX-License-Identifier: Apache-2.0 */ - package org.opensearch.sql.plugin.rest; import static org.opensearch.rest.RestStatus.BAD_REQUEST; @@ -11,11 +10,8 @@ import static org.opensearch.rest.RestStatus.OK; import static org.opensearch.rest.RestStatus.SERVICE_UNAVAILABLE; import static org.opensearch.sql.opensearch.executor.Scheduler.schedule; -import static org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style.PRETTY; import com.google.common.collect.ImmutableList; -import java.io.IOException; -import java.security.PrivilegedExceptionAction; import java.util.Arrays; import java.util.HashSet; import java.util.List; @@ -23,41 +19,27 @@ import java.util.function.Supplier; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.action.ActionListener; import org.opensearch.client.node.NodeClient; -import org.opensearch.cluster.service.ClusterService; import org.opensearch.index.IndexNotFoundException; import org.opensearch.rest.BaseRestHandler; import org.opensearch.rest.BytesRestResponse; import org.opensearch.rest.RestChannel; -import org.opensearch.rest.RestController; import org.opensearch.rest.RestRequest; import org.opensearch.rest.RestStatus; import org.opensearch.sql.common.antlr.SyntaxCheckException; -import org.opensearch.sql.common.response.ResponseListener; import org.opensearch.sql.common.setting.Settings; -import org.opensearch.sql.common.utils.LogUtils; import org.opensearch.sql.exception.ExpressionEvaluationException; import org.opensearch.sql.exception.QueryEngineException; import org.opensearch.sql.exception.SemanticCheckException; -import org.opensearch.sql.executor.ExecutionEngine.ExplainResponse; -import org.opensearch.sql.executor.ExecutionEngine.QueryResponse; import org.opensearch.sql.legacy.metrics.MetricName; import org.opensearch.sql.legacy.metrics.Metrics; import org.opensearch.sql.opensearch.response.error.ErrorMessageFactory; -import org.opensearch.sql.opensearch.security.SecurityAccess; import org.opensearch.sql.plugin.request.PPLQueryRequestFactory; -import org.opensearch.sql.ppl.PPLService; -import org.opensearch.sql.ppl.config.PPLServiceConfig; +import org.opensearch.sql.plugin.transport.PPLQueryAction; +import org.opensearch.sql.plugin.transport.TransportPPLQueryRequest; +import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse; import org.opensearch.sql.ppl.domain.PPLQueryRequest; -import org.opensearch.sql.protocol.response.QueryResult; -import org.opensearch.sql.protocol.response.format.CsvResponseFormatter; -import org.opensearch.sql.protocol.response.format.Format; -import org.opensearch.sql.protocol.response.format.JsonResponseFormatter; -import org.opensearch.sql.protocol.response.format.RawResponseFormatter; -import org.opensearch.sql.protocol.response.format.ResponseFormatter; -import org.opensearch.sql.protocol.response.format.SimpleJsonResponseFormatter; -import org.opensearch.sql.protocol.response.format.VisualizationResponseFormatter; -import org.springframework.context.annotation.AnnotationConfigApplicationContext; public class RestPPLQueryAction extends BaseRestHandler { public static final String QUERY_API_ENDPOINT = "/_plugins/_ppl"; @@ -67,30 +49,27 @@ public class RestPPLQueryAction extends BaseRestHandler { private static final Logger LOG = LogManager.getLogger(); - /** - * Cluster service required by bean initialization. - */ - private final ClusterService clusterService; - - /** - * Settings required by been initialization. - */ - private final Settings pluginSettings; - private final Supplier pplEnabled; - /** - * Constructor of RestPPLQueryAction. - */ - public RestPPLQueryAction(RestController restController, ClusterService clusterService, - Settings pluginSettings, - org.opensearch.common.settings.Settings clusterSettings) { + /** Constructor of RestPPLQueryAction. */ + public RestPPLQueryAction( + Settings pluginSettings, org.opensearch.common.settings.Settings clusterSettings) { super(); - this.clusterService = clusterService; - this.pluginSettings = pluginSettings; this.pplEnabled = - () -> MULTI_ALLOW_EXPLICIT_INDEX.get(clusterSettings) - && (Boolean) pluginSettings.getSettingValue(Settings.Key.PPL_ENABLED); + () -> + MULTI_ALLOW_EXPLICIT_INDEX.get(clusterSettings) + && (Boolean) pluginSettings.getSettingValue(Settings.Key.PPL_ENABLED); + } + + private static boolean isClientError(Exception e) { + return e instanceof NullPointerException + // NPE is hard to differentiate but more likely caused by bad query + || e instanceof IllegalArgumentException + || e instanceof IndexNotFoundException + || e instanceof SemanticCheckException + || e instanceof ExpressionEvaluationException + || e instanceof QueryEngineException + || e instanceof SyntaxCheckException; } @Override @@ -123,141 +102,62 @@ protected Set responseParams() { @Override protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient nodeClient) { - Metrics.getInstance().getNumericalMetric(MetricName.PPL_REQ_TOTAL).increment(); - Metrics.getInstance().getNumericalMetric(MetricName.PPL_REQ_COUNT_TOTAL).increment(); - - LogUtils.addRequestId(); - + // TODO: need move to transport Action if (!pplEnabled.get()) { return channel -> reportError(channel, new IllegalAccessException( - "Either plugins.ppl.enabled or rest.action.multi.allow_explicit_index setting is false" - ), BAD_REQUEST); - } - - PPLService pplService = createPPLService(nodeClient); - PPLQueryRequest pplRequest = PPLQueryRequestFactory.getPPLRequest(request); - - return channel -> schedule(nodeClient, () -> { - if (pplRequest.isExplainRequest()) { - pplService.explain(pplRequest, createExplainResponseListener(channel)); - } else { - pplService.execute(pplRequest, createListener(channel, pplRequest)); - } - }); - } - - /** - * Ideally, the AnnotationConfigApplicationContext should be shared across Plugin. By default, - * spring construct all the bean as singleton. Currently, there are no better solution to - * create the bean in protocol scope. The limitations are - * alt-1, add annotation for bean @Scope(value = SCOPE_PROTOTYPE, proxyMode = TARGET_CLASS), it - * works by add the proxy, - * but when running in OpenSearch, all the operation need security permission whic is hard - * to control. - * alt-2, using ObjectFactory with @Autowired, it also works, but require add to all the - * configuration. - * We will revisit the current solution if any major issue found. - */ - private PPLService createPPLService(NodeClient client) { - return doPrivileged(() -> { - AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); - context.registerBean(ClusterService.class, () -> clusterService); - context.registerBean(NodeClient.class, () -> client); - context.registerBean(Settings.class, () -> pluginSettings); - context.register(OpenSearchPluginConfig.class); - context.register(PPLServiceConfig.class); - context.refresh(); - return context.getBean(PPLService.class); - }); - } - - /** - * TODO: need to extract an interface for both SQL and PPL action handler and move these - * common methods to the interface. This is not easy to do now because SQL action handler - * is still in legacy module. - */ - private ResponseListener createExplainResponseListener( - RestChannel channel) { - return new ResponseListener() { - @Override - public void onResponse(ExplainResponse response) { - sendResponse(channel, OK, new JsonResponseFormatter(PRETTY) { - @Override - protected Object buildJsonObject(ExplainResponse response) { - return response; - } - }.format(response)); - } - - @Override - public void onFailure(Exception e) { - LOG.error("Error happened during explain", e); - sendResponse(channel, INTERNAL_SERVER_ERROR, - "Failed to explain the query due to error: " + e.getMessage()); - } - }; - } - - private ResponseListener createListener(RestChannel channel, - PPLQueryRequest pplRequest) { - Format format = pplRequest.format(); - ResponseFormatter formatter; - if (format.equals(Format.CSV)) { - formatter = new CsvResponseFormatter(pplRequest.sanitize()); - } else if (format.equals(Format.RAW)) { - formatter = new RawResponseFormatter(); - } else if (format.equals(Format.VIZ)) { - formatter = new VisualizationResponseFormatter(pplRequest.style()); - } else { - formatter = new SimpleJsonResponseFormatter(PRETTY); + "Either plugins.ppl.enabled or rest.action.multi.allow_explicit_index setting is false"), + BAD_REQUEST); } - return new ResponseListener() { - @Override - public void onResponse(QueryResponse response) { - sendResponse(channel, OK, formatter.format(new QueryResult(response.getSchema(), - response.getResults()))); - } - @Override - public void onFailure(Exception e) { - LOG.error("Error happened during query handling", e); - if (isClientError(e)) { - Metrics.getInstance().getNumericalMetric(MetricName.PPL_FAILED_REQ_COUNT_CUS).increment(); - reportError(channel, e, BAD_REQUEST); - } else { - Metrics.getInstance().getNumericalMetric(MetricName.PPL_FAILED_REQ_COUNT_SYS).increment(); - reportError(channel, e, SERVICE_UNAVAILABLE); - } - } - }; - } - - private T doPrivileged(PrivilegedExceptionAction action) { - try { - return SecurityAccess.doPrivileged(action); - } catch (IOException e) { - throw new IllegalStateException("Failed to perform privileged action", e); - } + TransportPPLQueryRequest transportPPLQueryRequest = new TransportPPLQueryRequest( + PPLQueryRequestFactory.getPPLRequest(request) + ); + + return channel -> schedule(nodeClient, () -> + nodeClient.execute( + PPLQueryAction.INSTANCE, + transportPPLQueryRequest, + new ActionListener<>() { + @Override + public void onResponse(TransportPPLQueryResponse response) { + sendResponse(channel, OK, response.getResult()); + } + + @Override + public void onFailure(Exception e) { + if (transportPPLQueryRequest.isExplainRequest()) { + LOG.error("Error happened during explain", e); + sendResponse( + channel, + INTERNAL_SERVER_ERROR, + "Failed to explain the query due to error: " + e.getMessage()); + } else if (e instanceof IllegalAccessException) { + reportError(channel, e, BAD_REQUEST); + } else { + LOG.error("Error happened during query handling", e); + if (isClientError(e)) { + Metrics.getInstance() + .getNumericalMetric(MetricName.PPL_FAILED_REQ_COUNT_CUS) + .increment(); + reportError(channel, e, BAD_REQUEST); + } else { + Metrics.getInstance() + .getNumericalMetric(MetricName.PPL_FAILED_REQ_COUNT_SYS) + .increment(); + reportError(channel, e, SERVICE_UNAVAILABLE); + } + } + } + })); } private void sendResponse(RestChannel channel, RestStatus status, String content) { - channel.sendResponse( - new BytesRestResponse(status, "application/json; charset=UTF-8", content)); + channel.sendResponse(new BytesRestResponse(status, "application/json; charset=UTF-8", content)); } private void reportError(final RestChannel channel, final Exception e, final RestStatus status) { - channel.sendResponse(new BytesRestResponse(status, - ErrorMessageFactory.createErrorMessage(e, status.getStatus()).toString())); - } - - private static boolean isClientError(Exception e) { - return e instanceof NullPointerException - // NPE is hard to differentiate but more likely caused by bad query - || e instanceof IllegalArgumentException - || e instanceof IndexNotFoundException - || e instanceof SemanticCheckException - || e instanceof ExpressionEvaluationException - || e instanceof QueryEngineException - || e instanceof SyntaxCheckException; + channel.sendResponse( + new BytesRestResponse( + status, ErrorMessageFactory.createErrorMessage(e, status.getStatus()).toString())); } } diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/transport/PPLQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/transport/PPLQueryAction.java new file mode 100644 index 0000000000..0babdba4ca --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/transport/PPLQueryAction.java @@ -0,0 +1,18 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.plugin.transport; + +import org.opensearch.action.ActionType; + +public class PPLQueryAction extends ActionType { + // Internal Action which is not used for public facing RestAPIs. + public static final String NAME = "cluster:admin/opensearch/ppl"; + public static final PPLQueryAction INSTANCE = new PPLQueryAction(); + + private PPLQueryAction() { + super(NAME, TransportPPLQueryResponse::new); + } +} diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java new file mode 100644 index 0000000000..e4699b6f9f --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java @@ -0,0 +1,182 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.plugin.transport; + +import static org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style.PRETTY; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.Locale; +import java.util.Optional; +import org.opensearch.action.ActionListener; +import org.opensearch.action.ActionRequest; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.client.node.NodeClient; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.common.setting.Settings; +import org.opensearch.sql.common.utils.LogUtils; +import org.opensearch.sql.executor.ExecutionEngine; +import org.opensearch.sql.legacy.metrics.MetricName; +import org.opensearch.sql.legacy.metrics.Metrics; +import org.opensearch.sql.opensearch.security.SecurityAccess; +import org.opensearch.sql.opensearch.setting.OpenSearchSettings; +import org.opensearch.sql.plugin.rest.OpenSearchPluginConfig; +import org.opensearch.sql.ppl.PPLService; +import org.opensearch.sql.ppl.config.PPLServiceConfig; +import org.opensearch.sql.ppl.domain.PPLQueryRequest; +import org.opensearch.sql.protocol.response.QueryResult; +import org.opensearch.sql.protocol.response.format.CsvResponseFormatter; +import org.opensearch.sql.protocol.response.format.Format; +import org.opensearch.sql.protocol.response.format.JsonResponseFormatter; +import org.opensearch.sql.protocol.response.format.RawResponseFormatter; +import org.opensearch.sql.protocol.response.format.ResponseFormatter; +import org.opensearch.sql.protocol.response.format.SimpleJsonResponseFormatter; +import org.opensearch.sql.protocol.response.format.VisualizationResponseFormatter; +import org.opensearch.tasks.Task; +import org.opensearch.transport.TransportService; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; + +/** Send PPL query transport action. */ +public class TransportPPLQueryAction + extends HandledTransportAction { + private final NodeClient client; + + /** Cluster service required by bean initialization. */ + private final ClusterService clusterService; + + /** Settings required by been initialization. */ + private final Settings pluginSettings; + + /** Constructor of TransportPPLQueryAction. */ + @Inject + public TransportPPLQueryAction( + TransportService transportService, + ActionFilters actionFilters, + NodeClient client, + ClusterService clusterService, + org.opensearch.common.settings.Settings clusterSettings) { + super(PPLQueryAction.NAME, transportService, actionFilters, TransportPPLQueryRequest::new); + this.client = client; + this.clusterService = clusterService; + this.pluginSettings = new OpenSearchSettings(clusterService.getClusterSettings()); + } + + /** + * {@inheritDoc} Transform the request and call super.doExecute() to support call from other + * plugins. + */ + @Override + protected void doExecute( + Task task, ActionRequest request, ActionListener listener) { + Metrics.getInstance().getNumericalMetric(MetricName.PPL_REQ_TOTAL).increment(); + Metrics.getInstance().getNumericalMetric(MetricName.PPL_REQ_COUNT_TOTAL).increment(); + + LogUtils.addRequestId(); + + PPLService pplService = createPPLService(client); + TransportPPLQueryRequest transportRequest = TransportPPLQueryRequest.fromActionRequest(request); + // in order to use PPL service, we need to convert TransportPPLQueryRequest to PPLQueryRequest + PPLQueryRequest transformedRequest = transportRequest.toPPLQueryRequest(); + + if (transformedRequest.isExplainRequest()) { + pplService.explain(transformedRequest, createExplainResponseListener(listener)); + } else { + pplService.execute(transformedRequest, createListener(transformedRequest, listener)); + } + } + + private PPLService createPPLService(NodeClient client) { + return doPrivileged( + () -> { + AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); + context.registerBean(ClusterService.class, () -> clusterService); + context.registerBean(NodeClient.class, () -> client); + context.registerBean(Settings.class, () -> pluginSettings); + context.register(OpenSearchPluginConfig.class); + context.register(PPLServiceConfig.class); + context.refresh(); + return context.getBean(PPLService.class); + }); + } + + private T doPrivileged(PrivilegedExceptionAction action) { + try { + return SecurityAccess.doPrivileged(action); + } catch (IOException e) { + throw new IllegalStateException("Failed to perform privileged action", e); + } + } + + /** + * TODO: need to extract an interface for both SQL and PPL action handler and move these common + * methods to the interface. This is not easy to do now because SQL action handler is still in + * legacy module. + */ + private ResponseListener createExplainResponseListener( + ActionListener listener) { + return new ResponseListener() { + @Override + public void onResponse(ExecutionEngine.ExplainResponse response) { + String responseContent = + new JsonResponseFormatter(PRETTY) { + @Override + protected Object buildJsonObject(ExecutionEngine.ExplainResponse response) { + return response; + } + }.format(response); + listener.onResponse(new TransportPPLQueryResponse(responseContent)); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }; + } + + private ResponseListener createListener( + PPLQueryRequest pplRequest, ActionListener listener) { + Format format = format(pplRequest); + ResponseFormatter formatter; + if (format.equals(Format.CSV)) { + formatter = new CsvResponseFormatter(pplRequest.sanitize()); + } else if (format.equals(Format.RAW)) { + formatter = new RawResponseFormatter(); + } else if (format.equals(Format.VIZ)) { + formatter = new VisualizationResponseFormatter(pplRequest.style()); + } else { + formatter = new SimpleJsonResponseFormatter(JsonResponseFormatter.Style.PRETTY); + } + + return new ResponseListener() { + @Override + public void onResponse(ExecutionEngine.QueryResponse response) { + String responseContent = + formatter.format(new QueryResult(response.getSchema(), response.getResults())); + listener.onResponse(new TransportPPLQueryResponse(responseContent)); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }; + } + + private Format format(PPLQueryRequest pplRequest) { + String format = pplRequest.getFormat(); + Optional optionalFormat = Format.of(format); + if (optionalFormat.isPresent()) { + return optionalFormat.get(); + } else { + throw new IllegalArgumentException( + String.format(Locale.ROOT, "response in %s format is not supported.", format)); + } + } +} diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryRequest.java b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryRequest.java new file mode 100644 index 0000000000..5ced07c02e --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryRequest.java @@ -0,0 +1,136 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.plugin.transport; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Locale; +import java.util.Optional; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.Setter; +import lombok.experimental.Accessors; +import org.json.JSONObject; +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.common.io.stream.InputStreamStreamInput; +import org.opensearch.common.io.stream.OutputStreamStreamOutput; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.sql.ppl.domain.PPLQueryRequest; +import org.opensearch.sql.protocol.response.format.Format; +import org.opensearch.sql.protocol.response.format.JsonResponseFormatter; + +@RequiredArgsConstructor +public class TransportPPLQueryRequest extends ActionRequest { + public static final TransportPPLQueryRequest NULL = new TransportPPLQueryRequest("", null, ""); + private final String pplQuery; + @Getter private final JSONObject jsonContent; + + @Getter private final String path; + + @Getter private String format = ""; + + @Setter + @Getter + @Accessors(fluent = true) + private boolean sanitize = true; + + @Setter + @Getter + @Accessors(fluent = true) + private JsonResponseFormatter.Style style = JsonResponseFormatter.Style.COMPACT; + + /** Constructor of TransportPPLQueryRequest from PPLQueryRequest. */ + public TransportPPLQueryRequest(PPLQueryRequest pplQueryRequest) { + pplQuery = pplQueryRequest.getRequest(); + jsonContent = pplQueryRequest.getJsonContent(); + path = pplQueryRequest.getPath(); + format = pplQueryRequest.getFormat(); + sanitize = pplQueryRequest.sanitize(); + style = pplQueryRequest.style(); + } + + /** Constructor of TransportPPLQueryRequest from StreamInput. */ + public TransportPPLQueryRequest(StreamInput in) throws IOException { + super(in); + pplQuery = in.readOptionalString(); + format = in.readOptionalString(); + String jsonContentString = in.readOptionalString(); + jsonContent = jsonContentString != null ? new JSONObject(jsonContentString) : null; + path = in.readOptionalString(); + sanitize = in.readBoolean(); + style = in.readEnum(JsonResponseFormatter.Style.class); + } + + /** Re-create the object from the actionRequest. */ + public static TransportPPLQueryRequest fromActionRequest(final ActionRequest actionRequest) { + if (actionRequest instanceof TransportPPLQueryRequest) { + return (TransportPPLQueryRequest) actionRequest; + } + + try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); + OutputStreamStreamOutput osso = new OutputStreamStreamOutput(baos)) { + actionRequest.writeTo(osso); + try (InputStreamStreamInput input = + new InputStreamStreamInput(new ByteArrayInputStream(baos.toByteArray()))) { + return new TransportPPLQueryRequest(input); + } + } catch (IOException e) { + throw new IllegalArgumentException( + "failed to parse ActionRequest into TransportPPLQueryRequest", e); + } + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeOptionalString(pplQuery); + out.writeOptionalString(format); + out.writeOptionalString(jsonContent != null ? jsonContent.toString() : null); + out.writeOptionalString(path); + out.writeBoolean(sanitize); + out.writeEnum(style); + } + + public String getRequest() { + return pplQuery; + } + + /** + * Check if request is to explain rather than execute the query. + * + * @return true if it is an explain request + */ + public boolean isExplainRequest() { + return path.endsWith("/_explain"); + } + + /** Decide on the formatter by the requested format. */ + public Format format() { + Optional optionalFormat = Format.of(format); + if (optionalFormat.isPresent()) { + return optionalFormat.get(); + } else { + throw new IllegalArgumentException( + String.format(Locale.ROOT, "response in %s format is not supported.", format)); + } + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + /** Convert to PPLQueryRequest. */ + public PPLQueryRequest toPPLQueryRequest() { + PPLQueryRequest pplQueryRequest = new PPLQueryRequest(pplQuery, jsonContent, path, format); + pplQueryRequest.sanitize(sanitize); + pplQueryRequest.style(style); + return pplQueryRequest; + } +} diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryResponse.java b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryResponse.java new file mode 100644 index 0000000000..bfd4994b07 --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryResponse.java @@ -0,0 +1,28 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.plugin.transport; + +import java.io.IOException; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import org.opensearch.action.ActionResponse; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; + +@RequiredArgsConstructor +public class TransportPPLQueryResponse extends ActionResponse { + @Getter private final String result; + + public TransportPPLQueryResponse(StreamInput in) throws IOException { + super(in); + result = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(result); + } +} diff --git a/plugin/src/test/java/org/opensearch/sql/plugin/transport/TransportPPLQueryRequestTest.java b/plugin/src/test/java/org/opensearch/sql/plugin/transport/TransportPPLQueryRequestTest.java new file mode 100644 index 0000000000..6e086f4d4c --- /dev/null +++ b/plugin/src/test/java/org/opensearch/sql/plugin/transport/TransportPPLQueryRequestTest.java @@ -0,0 +1,101 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.plugin.transport; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertNull; + +import java.io.IOException; +import org.json.JSONObject; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.common.io.stream.StreamOutput; + +public class TransportPPLQueryRequestTest { + + @Rule public ExpectedException exceptionRule = ExpectedException.none(); + + @Test + public void testValidate() { + TransportPPLQueryRequest request = new TransportPPLQueryRequest("source=t a=1", null, null); + assertNull(request.validate()); + } + + @Test + public void testTransportPPLQueryRequestFromActionRequest() { + TransportPPLQueryRequest request = new TransportPPLQueryRequest("source=t a=1", null, null); + assertEquals(TransportPPLQueryRequest.fromActionRequest(request), request); + } + + @Test + public void testCustomizedNonNullJSONContentActionRequestFromActionRequest() { + TransportPPLQueryRequest request = + new TransportPPLQueryRequest( + "source=t a=1", new JSONObject("{\"query\":\"source=t a=1\"}"), null); + ActionRequest actionRequest = + new ActionRequest() { + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + request.writeTo(out); + } + }; + TransportPPLQueryRequest recreatedObject = + TransportPPLQueryRequest.fromActionRequest(actionRequest); + assertNotSame(request, recreatedObject); + assertEquals(request.getRequest(), recreatedObject.getRequest()); + } + + @Test + public void testCustomizedNullJSONContentActionRequestFromActionRequest() { + TransportPPLQueryRequest request = new TransportPPLQueryRequest( + "source=t a=1", null, null + ); + ActionRequest actionRequest = + new ActionRequest() { + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + request.writeTo(out); + } + }; + TransportPPLQueryRequest recreatedObject = + TransportPPLQueryRequest.fromActionRequest(actionRequest); + assertNotSame(request, recreatedObject); + assertEquals(request.getRequest(), recreatedObject.getRequest()); + } + + @Test + public void testFailedParsingActionRequestFromActionRequest() { + ActionRequest actionRequest = + new ActionRequest() { + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString("sample"); + } + }; + exceptionRule.expect(IllegalArgumentException.class); + exceptionRule.expectMessage("failed to parse ActionRequest into TransportPPLQueryRequest"); + TransportPPLQueryRequest.fromActionRequest(actionRequest); + } +} diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/domain/PPLQueryRequest.java b/ppl/src/main/java/org/opensearch/sql/ppl/domain/PPLQueryRequest.java index 0a9959f045..0d8a4c63d1 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/domain/PPLQueryRequest.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/domain/PPLQueryRequest.java @@ -21,8 +21,11 @@ public class PPLQueryRequest { public static final PPLQueryRequest NULL = new PPLQueryRequest("", null, "", ""); private final String pplQuery; + @Getter private final JSONObject jsonContent; + @Getter private final String path; + @Getter private String format = ""; @Setter