From 06ef1410ccbe0e9f44f5c0c9d0c18f88cd54035f Mon Sep 17 00:00:00 2001 From: Zhongnan Su Date: Thu, 12 May 2022 09:22:04 -0700 Subject: [PATCH 1/2] add SQL/PPL transport request/response models and send SQL/PPL interfaces Signed-off-by: Zhongnan Su --- .../commons/sql/SQLPluginInterface.kt | 91 +++++++++++++++++++ .../commons/sql/action/BaseResponse.kt | 32 +++++++ .../commons/sql/action/SQLActions.kt | 28 ++++++ .../sql/action/TransportPPLQueryRequest.kt | 79 ++++++++++++++++ .../sql/action/TransportPPLQueryResponse.kt | 57 ++++++++++++ .../sql/action/TransportSQLQueryRequest.kt | 79 ++++++++++++++++ .../sql/action/TransportSQLQueryResponse.kt | 57 ++++++++++++ .../opensearch/commons/sql/model/QueryType.kt | 43 +++++++++ .../commons/utils/TransportHelpers.kt | 2 +- .../commons/sql/SQLPluginInterfaceTest.kt | 59 ++++++++++++ .../action/TransportPPLQueryRequestTest.kt | 40 ++++++++ .../action/TransportPPLQueryResponseTest.kt | 19 ++++ .../action/TransportSQLQueryRequestTest.kt | 40 ++++++++ .../action/TransportSQLQueryResponseTest.kt | 19 ++++ 14 files changed, 644 insertions(+), 1 deletion(-) create mode 100644 src/main/kotlin/org/opensearch/commons/sql/SQLPluginInterface.kt create mode 100644 src/main/kotlin/org/opensearch/commons/sql/action/BaseResponse.kt create mode 100644 src/main/kotlin/org/opensearch/commons/sql/action/SQLActions.kt create mode 100644 src/main/kotlin/org/opensearch/commons/sql/action/TransportPPLQueryRequest.kt create mode 100644 src/main/kotlin/org/opensearch/commons/sql/action/TransportPPLQueryResponse.kt create mode 100644 src/main/kotlin/org/opensearch/commons/sql/action/TransportSQLQueryRequest.kt create mode 100644 src/main/kotlin/org/opensearch/commons/sql/action/TransportSQLQueryResponse.kt create mode 100644 src/main/kotlin/org/opensearch/commons/sql/model/QueryType.kt create mode 100644 src/test/kotlin/org/opensearch/commons/sql/SQLPluginInterfaceTest.kt create mode 100644 src/test/kotlin/org/opensearch/commons/sql/action/TransportPPLQueryRequestTest.kt create mode 100644 src/test/kotlin/org/opensearch/commons/sql/action/TransportPPLQueryResponseTest.kt create mode 100644 src/test/kotlin/org/opensearch/commons/sql/action/TransportSQLQueryRequestTest.kt create mode 100644 src/test/kotlin/org/opensearch/commons/sql/action/TransportSQLQueryResponseTest.kt diff --git a/src/main/kotlin/org/opensearch/commons/sql/SQLPluginInterface.kt b/src/main/kotlin/org/opensearch/commons/sql/SQLPluginInterface.kt new file mode 100644 index 00000000..4c738018 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/sql/SQLPluginInterface.kt @@ -0,0 +1,91 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.sql + +import org.opensearch.action.ActionListener +import org.opensearch.action.ActionResponse +import org.opensearch.client.node.NodeClient +import org.opensearch.common.io.stream.Writeable +import org.opensearch.commons.ConfigConstants +import org.opensearch.commons.sql.action.BaseResponse +import org.opensearch.commons.sql.action.SQLActions +import org.opensearch.commons.sql.action.TransportPPLQueryRequest +import org.opensearch.commons.sql.action.TransportPPLQueryResponse +import org.opensearch.commons.sql.action.TransportSQLQueryRequest +import org.opensearch.commons.sql.action.TransportSQLQueryResponse +import org.opensearch.commons.utils.SecureClientWrapper +import org.opensearch.commons.utils.recreateObject + +/** + * All the transport action plugin interfaces for the SQL plugin + */ +object SQLPluginInterface { + + /** + * Send SQL API enabled for a feature. No REST API. Internal API only for Inter plugin communication. + * @param client Node client for making transport action + * @param query The query string + * @param listener The listener for getting response + */ + fun sendSQLQuery( + client: NodeClient, + query: String, + listener: ActionListener + ) { + val threadContext: String? = + client.threadPool().threadContext.getTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT) + val wrapper = SecureClientWrapper(client) // Executing request in privileged mode + wrapper.execute( + SQLActions.SEND_SQL_QUERY_ACTION_TYPE, + TransportSQLQueryRequest(query, threadContext), + wrapActionListener(listener) { response -> recreateObject(response) { TransportSQLQueryResponse(it) } } + ) + } + + /** + * Send PPL API enabled for a feature. No REST API. Internal API only for Inter plugin communication. + * @param client Node client for making transport action + * @param query The query string + * @param listener The listener for getting response + */ + fun sendPPLQuery( + client: NodeClient, + query: String, + listener: ActionListener + ) { + val threadContext: String? = + client.threadPool().threadContext.getTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT) + val wrapper = SecureClientWrapper(client) // Executing request in privileged mode + wrapper.execute( + SQLActions.SEND_PPL_QUERY_ACTION_TYPE, + TransportPPLQueryRequest(query, threadContext), + wrapActionListener(listener) { response -> recreateObject(response) { TransportPPLQueryResponse(it) } } + ) + } + + /** + * Wrap action listener on concrete response class by a new created one on ActionResponse. + * This is required because the response may be loaded by different classloader across plugins. + * The onResponse(ActionResponse) avoids type cast exception and give a chance to recreate + * the response object. + */ + @Suppress("UNCHECKED_CAST") + private fun wrapActionListener( + listener: ActionListener, + recreate: (Writeable) -> Response + ): ActionListener { + return object : ActionListener { + override fun onResponse(response: ActionResponse) { + val recreated = response as? Response ?: recreate(response) + listener.onResponse(recreated) + } + + override fun onFailure(exception: java.lang.Exception) { + listener.onFailure(exception) + } + } as ActionListener + } +} diff --git a/src/main/kotlin/org/opensearch/commons/sql/action/BaseResponse.kt b/src/main/kotlin/org/opensearch/commons/sql/action/BaseResponse.kt new file mode 100644 index 00000000..763072ca --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/sql/action/BaseResponse.kt @@ -0,0 +1,32 @@ +package org.opensearch.commons.sql.action + +import org.opensearch.action.ActionResponse +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.xcontent.ToXContentObject +import org.opensearch.rest.RestStatus +import java.io.IOException + +/** + * Base response which give REST status. + */ +abstract class BaseResponse : ActionResponse, ToXContentObject { + + /** + * constructor for creating the class + */ + constructor() + + /** + * {@inheritDoc} + */ + @Throws(IOException::class) + constructor(input: StreamInput) : super(input) + + /** + * get rest status for the response. Useful override for multi-status response. + * @return RestStatus for the response + */ + open fun getStatus(): RestStatus { + return RestStatus.OK + } +} diff --git a/src/main/kotlin/org/opensearch/commons/sql/action/SQLActions.kt b/src/main/kotlin/org/opensearch/commons/sql/action/SQLActions.kt new file mode 100644 index 00000000..e5c69cdd --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/sql/action/SQLActions.kt @@ -0,0 +1,28 @@ +package org.opensearch.commons.sql.action + +import org.opensearch.action.ActionType + +object SQLActions { + + /** + * Send SQL query. Internal only - Inter plugin communication. + */ + const val SEND_SQL_QUERY_NAME = "cluster:admin/opensearch/sql" + + /** + * Send PPL query. Internal only - Inter plugin communication. + */ + const val SEND_PPL_QUERY_NAME = "cluster:admin/opensearch/ppl" + + /** + * Send SQL query transport action type. + */ + val SEND_SQL_QUERY_ACTION_TYPE = + ActionType(SEND_SQL_QUERY_NAME, ::TransportSQLQueryResponse) + + /** + * Send PPL query transport action type. + */ + val SEND_PPL_QUERY_ACTION_TYPE = + ActionType(SEND_PPL_QUERY_NAME, ::TransportPPLQueryResponse) +} diff --git a/src/main/kotlin/org/opensearch/commons/sql/action/TransportPPLQueryRequest.kt b/src/main/kotlin/org/opensearch/commons/sql/action/TransportPPLQueryRequest.kt new file mode 100644 index 00000000..77271bfd --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/sql/action/TransportPPLQueryRequest.kt @@ -0,0 +1,79 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.sql.action + +import org.opensearch.action.ActionRequest +import org.opensearch.action.ActionRequestValidationException +import org.opensearch.action.ValidateActions +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.io.stream.Writeable +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.ToXContentObject +import org.opensearch.common.xcontent.XContentBuilder +import java.io.IOException + +/** + * Action Request to send ppl query. + */ +class TransportPPLQueryRequest : ActionRequest, ToXContentObject { + val query: String + val threadContext: String? + + companion object { + /** + * reader to create instance of class from writable. + */ + val reader = Writeable.Reader { TransportPPLQueryRequest(it) } + } + + /** + * constructor for creating the class + * @param query the ppl query string + * @param threadContext the user info thread context + */ + constructor( + query: String, + threadContext: String? + ) { + this.query = query + this.threadContext = threadContext + } + + /** + * {@inheritDoc} + */ + @Throws(IOException::class) + constructor(input: StreamInput) : super(input) { + query = input.readString() + threadContext = input.readOptionalString() + } + + @Throws(IOException::class) + override fun writeTo(output: StreamOutput) { + super.writeTo(output) + output.writeString(query) + output.writeOptionalString(threadContext) + } + + /** + * {@inheritDoc} + */ + override fun toXContent(builder: XContentBuilder?, params: ToXContent.Params?): XContentBuilder { + throw IllegalStateException("Transport ppl request is not intended for REST or persistence and does not support XContent.") + } + + /** + * {@inheritDoc} + */ + override fun validate(): ActionRequestValidationException? { + var validationException: ActionRequestValidationException? = null + if (query.isEmpty()) { + validationException = ValidateActions.addValidationError("query is empty", validationException) + } + return validationException + } +} diff --git a/src/main/kotlin/org/opensearch/commons/sql/action/TransportPPLQueryResponse.kt b/src/main/kotlin/org/opensearch/commons/sql/action/TransportPPLQueryResponse.kt new file mode 100644 index 00000000..73a7b1d5 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/sql/action/TransportPPLQueryResponse.kt @@ -0,0 +1,57 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.sql.action + +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.io.stream.Writeable +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentBuilder +import java.io.IOException + +/** + * Action Response for ppl query. + */ +class TransportPPLQueryResponse : BaseResponse { + val queryResponse: String + + companion object { + /** + * reader to create instance of class from writable. + */ + val reader = Writeable.Reader { TransportPPLQueryResponse(it) } + } + + /** + * constructor for creating the class + * @param queryResponse the ppl query response + */ + constructor( + queryResponse: String, + ) { + this.queryResponse = queryResponse + } + + /** + * {@inheritDoc} + */ + @Throws(IOException::class) + constructor(input: StreamInput) : super(input) { + queryResponse = input.readString() + } + + @Throws(IOException::class) + override fun writeTo(output: StreamOutput) { + output.writeString(queryResponse) + } + + /** + * {@inheritDoc} + */ + override fun toXContent(builder: XContentBuilder?, params: ToXContent.Params?): XContentBuilder { + throw IllegalStateException("Transport ppl response is not intended for REST or persistence and does not support XContent.") + } +} diff --git a/src/main/kotlin/org/opensearch/commons/sql/action/TransportSQLQueryRequest.kt b/src/main/kotlin/org/opensearch/commons/sql/action/TransportSQLQueryRequest.kt new file mode 100644 index 00000000..5e59512c --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/sql/action/TransportSQLQueryRequest.kt @@ -0,0 +1,79 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.sql.action + +import org.opensearch.action.ActionRequest +import org.opensearch.action.ActionRequestValidationException +import org.opensearch.action.ValidateActions +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.io.stream.Writeable +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.ToXContentObject +import org.opensearch.common.xcontent.XContentBuilder +import java.io.IOException + +/** + * Action Request to send sql query. + */ +class TransportSQLQueryRequest : ActionRequest, ToXContentObject { + val query: String + val threadContext: String? + + companion object { + /** + * reader to create instance of class from writable. + */ + val reader = Writeable.Reader { TransportSQLQueryRequest(it) } + } + + /** + * constructor for creating the class + * @param query the sql query string + * @param threadContext the user info thread context + */ + constructor( + query: String, + threadContext: String? + ) { + this.query = query + this.threadContext = threadContext + } + + /** + * {@inheritDoc} + */ + @Throws(IOException::class) + constructor(input: StreamInput) : super(input) { + query = input.readString() + threadContext = input.readOptionalString() + } + + @Throws(IOException::class) + override fun writeTo(output: StreamOutput) { + super.writeTo(output) + output.writeString(query) + output.writeOptionalString(threadContext) + } + + /** + * {@inheritDoc} + */ + override fun toXContent(builder: XContentBuilder?, params: ToXContent.Params?): XContentBuilder { + throw IllegalStateException("Transport sql request is not intended for REST or persistence and does not support XContent.") + } + + /** + * {@inheritDoc} + */ + override fun validate(): ActionRequestValidationException? { + var validationException: ActionRequestValidationException? = null + if (query.isEmpty()) { + validationException = ValidateActions.addValidationError("query is empty", validationException) + } + return validationException + } +} diff --git a/src/main/kotlin/org/opensearch/commons/sql/action/TransportSQLQueryResponse.kt b/src/main/kotlin/org/opensearch/commons/sql/action/TransportSQLQueryResponse.kt new file mode 100644 index 00000000..e3cb0321 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/sql/action/TransportSQLQueryResponse.kt @@ -0,0 +1,57 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.sql.action + +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.io.stream.Writeable +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentBuilder +import java.io.IOException + +/** + * Action Response for sql query. + */ +class TransportSQLQueryResponse : BaseResponse { + val queryResponse: String + + companion object { + /** + * reader to create instance of class from writable. + */ + val reader = Writeable.Reader { TransportSQLQueryResponse(it) } + } + + /** + * constructor for creating the class + * @param queryResponse the sql query response + */ + constructor( + queryResponse: String, + ) { + this.queryResponse = queryResponse + } + + /** + * {@inheritDoc} + */ + @Throws(IOException::class) + constructor(input: StreamInput) : super(input) { + queryResponse = input.readString() + } + + @Throws(IOException::class) + override fun writeTo(output: StreamOutput) { + output.writeString(queryResponse) + } + + /** + * {@inheritDoc} + */ + override fun toXContent(builder: XContentBuilder?, params: ToXContent.Params?): XContentBuilder { + throw IllegalStateException("Transport sql response is not intended for REST or persistence and does not support XContent.") + } +} diff --git a/src/main/kotlin/org/opensearch/commons/sql/model/QueryType.kt b/src/main/kotlin/org/opensearch/commons/sql/model/QueryType.kt new file mode 100644 index 00000000..27944187 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/sql/model/QueryType.kt @@ -0,0 +1,43 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.sql.model + +import org.opensearch.commons.utils.EnumParser + +/** + * Enum for Notification config type + */ +enum class QueryType(val tag: String) { + NONE("none") { + override fun toString(): String { + return tag + } + }, + SQL("ppl") { + override fun toString(): String { + return tag + } + }, + PPL("sql") { + override fun toString(): String { + return tag + } + }; + companion object { + private val tagMap = values().associateBy { it.tag } + + val enumParser = EnumParser { fromTagOrDefault(it) } + + /** + * Get QueryType from tag or NONE if not found + * @param tag the tag + * @return QueryType corresponding to tag. NONE if invalid tag. + */ + fun fromTagOrDefault(tag: String): QueryType { + return tagMap[tag] ?: NONE + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/utils/TransportHelpers.kt b/src/main/kotlin/org/opensearch/commons/utils/TransportHelpers.kt index 049dabdc..7ed91f96 100644 --- a/src/main/kotlin/org/opensearch/commons/utils/TransportHelpers.kt +++ b/src/main/kotlin/org/opensearch/commons/utils/TransportHelpers.kt @@ -26,7 +26,7 @@ val STRING_WRITER = Writeable.Writer { output: StreamOutput, value: String -> * This method needs to be inline and reified so that when this is called from * doExecute() of transport action, the object may be created from other JVM. */ -inline fun recreateObject(writeable: Writeable, block: (StreamInput) -> Request): Request { +inline fun recreateObject(writeable: Writeable, block: (StreamInput) -> Request): Request { ByteArrayOutputStream().use { byteArrayOutputStream -> OutputStreamStreamOutput(byteArrayOutputStream).use { writeable.writeTo(it) diff --git a/src/test/kotlin/org/opensearch/commons/sql/SQLPluginInterfaceTest.kt b/src/test/kotlin/org/opensearch/commons/sql/SQLPluginInterfaceTest.kt new file mode 100644 index 00000000..ceb7a1e3 --- /dev/null +++ b/src/test/kotlin/org/opensearch/commons/sql/SQLPluginInterfaceTest.kt @@ -0,0 +1,59 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.sql + +import com.nhaarman.mockitokotlin2.whenever +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.mockito.Answers +import org.mockito.ArgumentMatchers +import org.mockito.Mock +import org.mockito.Mockito +import org.mockito.junit.jupiter.MockitoExtension +import org.opensearch.action.ActionListener +import org.opensearch.action.ActionType +import org.opensearch.client.node.NodeClient +import org.opensearch.commons.sql.action.TransportPPLQueryResponse +import org.opensearch.commons.sql.action.TransportSQLQueryResponse + +@Suppress("UNCHECKED_CAST") +@ExtendWith(MockitoExtension::class) +internal class SQLPluginInterfaceTest { + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private lateinit var client: NodeClient + + @Test + fun sendSQLQuery() { + val query = "SELECT * FROM account;" + val response = TransportSQLQueryResponse("sample response") + val listener: ActionListener = + Mockito.mock(ActionListener::class.java) as ActionListener + + Mockito.doAnswer { + (it.getArgument(2) as ActionListener) + .onResponse(response) + }.whenever(client).execute(Mockito.any(ActionType::class.java), Mockito.any(), Mockito.any()) + + SQLPluginInterface.sendSQLQuery(client, query, listener) + Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response)) + } + + @Test + fun sendPPLQuery() { + val query = "search source=accounts" + val response = TransportPPLQueryResponse("sample response") + val listener: ActionListener = + Mockito.mock(ActionListener::class.java) as ActionListener + + Mockito.doAnswer { + (it.getArgument(2) as ActionListener) + .onResponse(response) + }.whenever(client).execute(Mockito.any(ActionType::class.java), Mockito.any(), Mockito.any()) + + SQLPluginInterface.sendPPLQuery(client, query, listener) + Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response)) + } +} diff --git a/src/test/kotlin/org/opensearch/commons/sql/action/TransportPPLQueryRequestTest.kt b/src/test/kotlin/org/opensearch/commons/sql/action/TransportPPLQueryRequestTest.kt new file mode 100644 index 00000000..1ce6f18d --- /dev/null +++ b/src/test/kotlin/org/opensearch/commons/sql/action/TransportPPLQueryRequestTest.kt @@ -0,0 +1,40 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.sql.action + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Assertions.assertNull +import org.junit.jupiter.api.Test +import org.opensearch.commons.utils.recreateObject + +internal class TransportPPLQueryRequestTest { + + private fun assertGetRequestEquals( + expected: TransportPPLQueryRequest, + actual: TransportPPLQueryRequest + ) { + assertEquals(expected.query, actual.query) + assertEquals(expected.threadContext, actual.threadContext) + assertNull(actual.validate()) + } + + @Test + fun `Send request serialize and deserialize transport object should be equal PPL`() { + val query = "search source=accounts" + val request = TransportPPLQueryRequest(query, "sample-thread-context") + val recreatedObject = recreateObject(request) { TransportPPLQueryRequest(it) } + assertGetRequestEquals(request, recreatedObject) + } + + @Test + fun `Send query request validate return exception if query is empty`() { + val query = "" + val request = TransportPPLQueryRequest(query, "sample-thread-context") + val recreatedObject = recreateObject(request) { TransportPPLQueryRequest(it) } + assertNotNull(recreatedObject.validate()) + } +} diff --git a/src/test/kotlin/org/opensearch/commons/sql/action/TransportPPLQueryResponseTest.kt b/src/test/kotlin/org/opensearch/commons/sql/action/TransportPPLQueryResponseTest.kt new file mode 100644 index 00000000..211847c2 --- /dev/null +++ b/src/test/kotlin/org/opensearch/commons/sql/action/TransportPPLQueryResponseTest.kt @@ -0,0 +1,19 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.sql.action + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.opensearch.commons.utils.recreateObject + +internal class TransportPPLQueryResponseTest { + @Test + fun `Create response serialize and deserialize transport object should be equal`() { + val queryResponse = TransportPPLQueryResponse("response string") + val recreatedObject = recreateObject(queryResponse) { TransportPPLQueryResponse(it) } + assertEquals(queryResponse.queryResponse, recreatedObject.queryResponse) + } +} diff --git a/src/test/kotlin/org/opensearch/commons/sql/action/TransportSQLQueryRequestTest.kt b/src/test/kotlin/org/opensearch/commons/sql/action/TransportSQLQueryRequestTest.kt new file mode 100644 index 00000000..3308f4ab --- /dev/null +++ b/src/test/kotlin/org/opensearch/commons/sql/action/TransportSQLQueryRequestTest.kt @@ -0,0 +1,40 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.sql.action + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Assertions.assertNull +import org.junit.jupiter.api.Test +import org.opensearch.commons.utils.recreateObject + +internal class TransportSQLQueryRequestTest { + + private fun assertGetRequestEquals( + expected: TransportSQLQueryRequest, + actual: TransportSQLQueryRequest + ) { + assertEquals(expected.query, actual.query) + assertEquals(expected.threadContext, actual.threadContext) + assertNull(actual.validate()) + } + + @Test + fun `Send request serialize and deserialize transport object should be equal SQL`() { + val query = "SELECT * FROM account;" + val request = TransportSQLQueryRequest(query, "sample-thread-context") + val recreatedObject = recreateObject(request) { TransportSQLQueryRequest(it) } + assertGetRequestEquals(request, recreatedObject) + } + + @Test + fun `Send query request validate return exception if query is empty`() { + val query = "" + val request = TransportSQLQueryRequest(query, "sample-thread-context") + val recreatedObject = recreateObject(request) { TransportSQLQueryRequest(it) } + assertNotNull(recreatedObject.validate()) + } +} diff --git a/src/test/kotlin/org/opensearch/commons/sql/action/TransportSQLQueryResponseTest.kt b/src/test/kotlin/org/opensearch/commons/sql/action/TransportSQLQueryResponseTest.kt new file mode 100644 index 00000000..3a7b3eae --- /dev/null +++ b/src/test/kotlin/org/opensearch/commons/sql/action/TransportSQLQueryResponseTest.kt @@ -0,0 +1,19 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.sql.action + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.opensearch.commons.utils.recreateObject + +internal class TransportSQLQueryResponseTest { + @Test + fun `Create response serialize and deserialize transport object should be equal`() { + val queryResponse = TransportSQLQueryResponse("response string") + val recreatedObject = recreateObject(queryResponse) { TransportSQLQueryResponse(it) } + assertEquals(queryResponse.queryResponse, recreatedObject.queryResponse) + } +} From 005cf4d21b6df2b5f07d5a8bff047b9080ddc059 Mon Sep 17 00:00:00 2001 From: Zhongnan Su Date: Tue, 24 May 2022 02:58:39 -0700 Subject: [PATCH 2/2] refactor transportPPLQueryRequest Signed-off-by: Zhongnan Su --- .../commons/sql/SQLPluginInterface.kt | 12 +++--- .../sql/action/TransportPPLQueryRequest.kt | 35 ++++++++++++--- .../sql/action/TransportPPLQueryResponse.kt | 6 ++- .../opensearch/commons/sql/model/QueryType.kt | 43 ------------------- .../org/opensearch/commons/sql/model/Style.kt | 13 ++++++ .../commons/sql/SQLPluginInterfaceTest.kt | 5 ++- .../action/TransportPPLQueryRequestTest.kt | 10 +++-- 7 files changed, 63 insertions(+), 61 deletions(-) delete mode 100644 src/main/kotlin/org/opensearch/commons/sql/model/QueryType.kt create mode 100644 src/main/kotlin/org/opensearch/commons/sql/model/Style.kt diff --git a/src/main/kotlin/org/opensearch/commons/sql/SQLPluginInterface.kt b/src/main/kotlin/org/opensearch/commons/sql/SQLPluginInterface.kt index 4c738018..98b25fd5 100644 --- a/src/main/kotlin/org/opensearch/commons/sql/SQLPluginInterface.kt +++ b/src/main/kotlin/org/opensearch/commons/sql/SQLPluginInterface.kt @@ -53,15 +53,15 @@ object SQLPluginInterface { */ fun sendPPLQuery( client: NodeClient, - query: String, + request: TransportPPLQueryRequest, listener: ActionListener ) { - val threadContext: String? = - client.threadPool().threadContext.getTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT) - val wrapper = SecureClientWrapper(client) // Executing request in privileged mode - wrapper.execute( +// val threadContext: String? = +// client.threadPool().threadContext.getTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT) +// val wrapper = SecureClientWrapper(client) // Executing request in privileged mode + client.execute( SQLActions.SEND_PPL_QUERY_ACTION_TYPE, - TransportPPLQueryRequest(query, threadContext), + request, wrapActionListener(listener) { response -> recreateObject(response) { TransportPPLQueryResponse(it) } } ) } diff --git a/src/main/kotlin/org/opensearch/commons/sql/action/TransportPPLQueryRequest.kt b/src/main/kotlin/org/opensearch/commons/sql/action/TransportPPLQueryRequest.kt index 77271bfd..c20aeefc 100644 --- a/src/main/kotlin/org/opensearch/commons/sql/action/TransportPPLQueryRequest.kt +++ b/src/main/kotlin/org/opensearch/commons/sql/action/TransportPPLQueryRequest.kt @@ -14,13 +14,18 @@ import org.opensearch.common.io.stream.Writeable import org.opensearch.common.xcontent.ToXContent import org.opensearch.common.xcontent.ToXContentObject import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.commons.sql.model.Style import java.io.IOException /** * Action Request to send ppl query. */ class TransportPPLQueryRequest : ActionRequest, ToXContentObject { - val query: String + val pplQuery: String + val path: String + var format: String + var sanitize: Boolean? + var style: Style? val threadContext: String? companion object { @@ -37,9 +42,17 @@ class TransportPPLQueryRequest : ActionRequest, ToXContentObject { */ constructor( query: String, + path: String, + format: String = "", + sanitize: Boolean? = true, + style: Style? = Style.COMPACT, threadContext: String? ) { - this.query = query + this.pplQuery = query + this.path = path + this.format = format + this.sanitize = sanitize + this.style = style this.threadContext = threadContext } @@ -48,22 +61,30 @@ class TransportPPLQueryRequest : ActionRequest, ToXContentObject { */ @Throws(IOException::class) constructor(input: StreamInput) : super(input) { - query = input.readString() + pplQuery = input.readString() + path = input.readString() + format = input.readString() + sanitize = input.readOptionalBoolean() + style = input.readEnum(Style::class.java) threadContext = input.readOptionalString() } @Throws(IOException::class) override fun writeTo(output: StreamOutput) { super.writeTo(output) - output.writeString(query) + output.writeString(pplQuery) + output.writeString(path) + output.writeOptionalString(format) + output.writeOptionalBoolean(sanitize) + output.writeEnum(style) output.writeOptionalString(threadContext) } /** * {@inheritDoc} */ - override fun toXContent(builder: XContentBuilder?, params: ToXContent.Params?): XContentBuilder { - throw IllegalStateException("Transport ppl request is not intended for REST or persistence and does not support XContent.") + override fun toXContent(builder: XContentBuilder?, params: ToXContent.Params?): XContentBuilder? { + return null } /** @@ -71,7 +92,7 @@ class TransportPPLQueryRequest : ActionRequest, ToXContentObject { */ override fun validate(): ActionRequestValidationException? { var validationException: ActionRequestValidationException? = null - if (query.isEmpty()) { + if (pplQuery.isEmpty()) { validationException = ValidateActions.addValidationError("query is empty", validationException) } return validationException diff --git a/src/main/kotlin/org/opensearch/commons/sql/action/TransportPPLQueryResponse.kt b/src/main/kotlin/org/opensearch/commons/sql/action/TransportPPLQueryResponse.kt index 73a7b1d5..be518265 100644 --- a/src/main/kotlin/org/opensearch/commons/sql/action/TransportPPLQueryResponse.kt +++ b/src/main/kotlin/org/opensearch/commons/sql/action/TransportPPLQueryResponse.kt @@ -52,6 +52,10 @@ class TransportPPLQueryResponse : BaseResponse { * {@inheritDoc} */ override fun toXContent(builder: XContentBuilder?, params: ToXContent.Params?): XContentBuilder { - throw IllegalStateException("Transport ppl response is not intended for REST or persistence and does not support XContent.") +// throw IllegalStateException("Transport ppl response is not intended for REST or persistence and does not support XContent.") + builder!! + return builder.startObject() + .field("response", queryResponse) + .endObject() } } diff --git a/src/main/kotlin/org/opensearch/commons/sql/model/QueryType.kt b/src/main/kotlin/org/opensearch/commons/sql/model/QueryType.kt deleted file mode 100644 index 27944187..00000000 --- a/src/main/kotlin/org/opensearch/commons/sql/model/QueryType.kt +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.commons.sql.model - -import org.opensearch.commons.utils.EnumParser - -/** - * Enum for Notification config type - */ -enum class QueryType(val tag: String) { - NONE("none") { - override fun toString(): String { - return tag - } - }, - SQL("ppl") { - override fun toString(): String { - return tag - } - }, - PPL("sql") { - override fun toString(): String { - return tag - } - }; - companion object { - private val tagMap = values().associateBy { it.tag } - - val enumParser = EnumParser { fromTagOrDefault(it) } - - /** - * Get QueryType from tag or NONE if not found - * @param tag the tag - * @return QueryType corresponding to tag. NONE if invalid tag. - */ - fun fromTagOrDefault(tag: String): QueryType { - return tagMap[tag] ?: NONE - } - } -} diff --git a/src/main/kotlin/org/opensearch/commons/sql/model/Style.kt b/src/main/kotlin/org/opensearch/commons/sql/model/Style.kt new file mode 100644 index 00000000..65d5c63d --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/sql/model/Style.kt @@ -0,0 +1,13 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.sql.model + +/** + * Enum for PPL/SQL Response Formatter Style Type + */ +enum class Style { + PRETTY, COMPACT +} diff --git a/src/test/kotlin/org/opensearch/commons/sql/SQLPluginInterfaceTest.kt b/src/test/kotlin/org/opensearch/commons/sql/SQLPluginInterfaceTest.kt index ceb7a1e3..79d0f03d 100644 --- a/src/test/kotlin/org/opensearch/commons/sql/SQLPluginInterfaceTest.kt +++ b/src/test/kotlin/org/opensearch/commons/sql/SQLPluginInterfaceTest.kt @@ -16,6 +16,7 @@ import org.mockito.junit.jupiter.MockitoExtension import org.opensearch.action.ActionListener import org.opensearch.action.ActionType import org.opensearch.client.node.NodeClient +import org.opensearch.commons.sql.action.TransportPPLQueryRequest import org.opensearch.commons.sql.action.TransportPPLQueryResponse import org.opensearch.commons.sql.action.TransportSQLQueryResponse @@ -44,6 +45,8 @@ internal class SQLPluginInterfaceTest { @Test fun sendPPLQuery() { val query = "search source=accounts" + val path = "plugin/_ppl" + val format = "" val response = TransportPPLQueryResponse("sample response") val listener: ActionListener = Mockito.mock(ActionListener::class.java) as ActionListener @@ -53,7 +56,7 @@ internal class SQLPluginInterfaceTest { .onResponse(response) }.whenever(client).execute(Mockito.any(ActionType::class.java), Mockito.any(), Mockito.any()) - SQLPluginInterface.sendPPLQuery(client, query, listener) + SQLPluginInterface.sendPPLQuery(client, TransportPPLQueryRequest(query, path, format, threadContext = null), listener) Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response)) } } diff --git a/src/test/kotlin/org/opensearch/commons/sql/action/TransportPPLQueryRequestTest.kt b/src/test/kotlin/org/opensearch/commons/sql/action/TransportPPLQueryRequestTest.kt index 1ce6f18d..4ae37901 100644 --- a/src/test/kotlin/org/opensearch/commons/sql/action/TransportPPLQueryRequestTest.kt +++ b/src/test/kotlin/org/opensearch/commons/sql/action/TransportPPLQueryRequestTest.kt @@ -17,7 +17,7 @@ internal class TransportPPLQueryRequestTest { expected: TransportPPLQueryRequest, actual: TransportPPLQueryRequest ) { - assertEquals(expected.query, actual.query) + assertEquals(expected.pplQuery, actual.pplQuery) assertEquals(expected.threadContext, actual.threadContext) assertNull(actual.validate()) } @@ -25,7 +25,9 @@ internal class TransportPPLQueryRequestTest { @Test fun `Send request serialize and deserialize transport object should be equal PPL`() { val query = "search source=accounts" - val request = TransportPPLQueryRequest(query, "sample-thread-context") + val path = "plugin/_ppl" + val format = "" + val request = TransportPPLQueryRequest(query, path, format, threadContext = "sample") val recreatedObject = recreateObject(request) { TransportPPLQueryRequest(it) } assertGetRequestEquals(request, recreatedObject) } @@ -33,7 +35,9 @@ internal class TransportPPLQueryRequestTest { @Test fun `Send query request validate return exception if query is empty`() { val query = "" - val request = TransportPPLQueryRequest(query, "sample-thread-context") + val path = "plugin/_ppl" + val format = "" + val request = TransportPPLQueryRequest(query, path, format, threadContext = "sample") val recreatedObject = recreateObject(request) { TransportPPLQueryRequest(it) } assertNotNull(recreatedObject.validate()) }