diff --git a/src/main/kotlin/org/opensearch/commons/sql/SQLPluginInterface.kt b/src/main/kotlin/org/opensearch/commons/sql/SQLPluginInterface.kt new file mode 100644 index 00000000..4c738018 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/sql/SQLPluginInterface.kt @@ -0,0 +1,91 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.sql + +import org.opensearch.action.ActionListener +import org.opensearch.action.ActionResponse +import org.opensearch.client.node.NodeClient +import org.opensearch.common.io.stream.Writeable +import org.opensearch.commons.ConfigConstants +import org.opensearch.commons.sql.action.BaseResponse +import org.opensearch.commons.sql.action.SQLActions +import org.opensearch.commons.sql.action.TransportPPLQueryRequest +import org.opensearch.commons.sql.action.TransportPPLQueryResponse +import org.opensearch.commons.sql.action.TransportSQLQueryRequest +import org.opensearch.commons.sql.action.TransportSQLQueryResponse +import org.opensearch.commons.utils.SecureClientWrapper +import org.opensearch.commons.utils.recreateObject + +/** + * All the transport action plugin interfaces for the SQL plugin + */ +object SQLPluginInterface { + + /** + * Send SQL API enabled for a feature. No REST API. Internal API only for Inter plugin communication. + * @param client Node client for making transport action + * @param query The query string + * @param listener The listener for getting response + */ + fun sendSQLQuery( + client: NodeClient, + query: String, + listener: ActionListener + ) { + val threadContext: String? = + client.threadPool().threadContext.getTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT) + val wrapper = SecureClientWrapper(client) // Executing request in privileged mode + wrapper.execute( + SQLActions.SEND_SQL_QUERY_ACTION_TYPE, + TransportSQLQueryRequest(query, threadContext), + wrapActionListener(listener) { response -> recreateObject(response) { TransportSQLQueryResponse(it) } } + ) + } + + /** + * Send PPL API enabled for a feature. No REST API. Internal API only for Inter plugin communication. + * @param client Node client for making transport action + * @param query The query string + * @param listener The listener for getting response + */ + fun sendPPLQuery( + client: NodeClient, + query: String, + listener: ActionListener + ) { + val threadContext: String? = + client.threadPool().threadContext.getTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT) + val wrapper = SecureClientWrapper(client) // Executing request in privileged mode + wrapper.execute( + SQLActions.SEND_PPL_QUERY_ACTION_TYPE, + TransportPPLQueryRequest(query, threadContext), + wrapActionListener(listener) { response -> recreateObject(response) { TransportPPLQueryResponse(it) } } + ) + } + + /** + * Wrap action listener on concrete response class by a new created one on ActionResponse. + * This is required because the response may be loaded by different classloader across plugins. + * The onResponse(ActionResponse) avoids type cast exception and give a chance to recreate + * the response object. + */ + @Suppress("UNCHECKED_CAST") + private fun wrapActionListener( + listener: ActionListener, + recreate: (Writeable) -> Response + ): ActionListener { + return object : ActionListener { + override fun onResponse(response: ActionResponse) { + val recreated = response as? Response ?: recreate(response) + listener.onResponse(recreated) + } + + override fun onFailure(exception: java.lang.Exception) { + listener.onFailure(exception) + } + } as ActionListener + } +} diff --git a/src/main/kotlin/org/opensearch/commons/sql/action/BaseResponse.kt b/src/main/kotlin/org/opensearch/commons/sql/action/BaseResponse.kt new file mode 100644 index 00000000..763072ca --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/sql/action/BaseResponse.kt @@ -0,0 +1,32 @@ +package org.opensearch.commons.sql.action + +import org.opensearch.action.ActionResponse +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.xcontent.ToXContentObject +import org.opensearch.rest.RestStatus +import java.io.IOException + +/** + * Base response which give REST status. + */ +abstract class BaseResponse : ActionResponse, ToXContentObject { + + /** + * constructor for creating the class + */ + constructor() + + /** + * {@inheritDoc} + */ + @Throws(IOException::class) + constructor(input: StreamInput) : super(input) + + /** + * get rest status for the response. Useful override for multi-status response. + * @return RestStatus for the response + */ + open fun getStatus(): RestStatus { + return RestStatus.OK + } +} diff --git a/src/main/kotlin/org/opensearch/commons/sql/action/SQLActions.kt b/src/main/kotlin/org/opensearch/commons/sql/action/SQLActions.kt new file mode 100644 index 00000000..e5c69cdd --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/sql/action/SQLActions.kt @@ -0,0 +1,28 @@ +package org.opensearch.commons.sql.action + +import org.opensearch.action.ActionType + +object SQLActions { + + /** + * Send SQL query. Internal only - Inter plugin communication. + */ + const val SEND_SQL_QUERY_NAME = "cluster:admin/opensearch/sql" + + /** + * Send PPL query. Internal only - Inter plugin communication. + */ + const val SEND_PPL_QUERY_NAME = "cluster:admin/opensearch/ppl" + + /** + * Send SQL query transport action type. + */ + val SEND_SQL_QUERY_ACTION_TYPE = + ActionType(SEND_SQL_QUERY_NAME, ::TransportSQLQueryResponse) + + /** + * Send PPL query transport action type. + */ + val SEND_PPL_QUERY_ACTION_TYPE = + ActionType(SEND_PPL_QUERY_NAME, ::TransportPPLQueryResponse) +} diff --git a/src/main/kotlin/org/opensearch/commons/sql/action/TransportPPLQueryRequest.kt b/src/main/kotlin/org/opensearch/commons/sql/action/TransportPPLQueryRequest.kt new file mode 100644 index 00000000..77271bfd --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/sql/action/TransportPPLQueryRequest.kt @@ -0,0 +1,79 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.sql.action + +import org.opensearch.action.ActionRequest +import org.opensearch.action.ActionRequestValidationException +import org.opensearch.action.ValidateActions +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.io.stream.Writeable +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.ToXContentObject +import org.opensearch.common.xcontent.XContentBuilder +import java.io.IOException + +/** + * Action Request to send ppl query. + */ +class TransportPPLQueryRequest : ActionRequest, ToXContentObject { + val query: String + val threadContext: String? + + companion object { + /** + * reader to create instance of class from writable. + */ + val reader = Writeable.Reader { TransportPPLQueryRequest(it) } + } + + /** + * constructor for creating the class + * @param query the ppl query string + * @param threadContext the user info thread context + */ + constructor( + query: String, + threadContext: String? + ) { + this.query = query + this.threadContext = threadContext + } + + /** + * {@inheritDoc} + */ + @Throws(IOException::class) + constructor(input: StreamInput) : super(input) { + query = input.readString() + threadContext = input.readOptionalString() + } + + @Throws(IOException::class) + override fun writeTo(output: StreamOutput) { + super.writeTo(output) + output.writeString(query) + output.writeOptionalString(threadContext) + } + + /** + * {@inheritDoc} + */ + override fun toXContent(builder: XContentBuilder?, params: ToXContent.Params?): XContentBuilder { + throw IllegalStateException("Transport ppl request is not intended for REST or persistence and does not support XContent.") + } + + /** + * {@inheritDoc} + */ + override fun validate(): ActionRequestValidationException? { + var validationException: ActionRequestValidationException? = null + if (query.isEmpty()) { + validationException = ValidateActions.addValidationError("query is empty", validationException) + } + return validationException + } +} diff --git a/src/main/kotlin/org/opensearch/commons/sql/action/TransportPPLQueryResponse.kt b/src/main/kotlin/org/opensearch/commons/sql/action/TransportPPLQueryResponse.kt new file mode 100644 index 00000000..73a7b1d5 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/sql/action/TransportPPLQueryResponse.kt @@ -0,0 +1,57 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.sql.action + +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.io.stream.Writeable +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentBuilder +import java.io.IOException + +/** + * Action Response for ppl query. + */ +class TransportPPLQueryResponse : BaseResponse { + val queryResponse: String + + companion object { + /** + * reader to create instance of class from writable. + */ + val reader = Writeable.Reader { TransportPPLQueryResponse(it) } + } + + /** + * constructor for creating the class + * @param queryResponse the ppl query response + */ + constructor( + queryResponse: String, + ) { + this.queryResponse = queryResponse + } + + /** + * {@inheritDoc} + */ + @Throws(IOException::class) + constructor(input: StreamInput) : super(input) { + queryResponse = input.readString() + } + + @Throws(IOException::class) + override fun writeTo(output: StreamOutput) { + output.writeString(queryResponse) + } + + /** + * {@inheritDoc} + */ + override fun toXContent(builder: XContentBuilder?, params: ToXContent.Params?): XContentBuilder { + throw IllegalStateException("Transport ppl response is not intended for REST or persistence and does not support XContent.") + } +} diff --git a/src/main/kotlin/org/opensearch/commons/sql/action/TransportSQLQueryRequest.kt b/src/main/kotlin/org/opensearch/commons/sql/action/TransportSQLQueryRequest.kt new file mode 100644 index 00000000..5e59512c --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/sql/action/TransportSQLQueryRequest.kt @@ -0,0 +1,79 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.sql.action + +import org.opensearch.action.ActionRequest +import org.opensearch.action.ActionRequestValidationException +import org.opensearch.action.ValidateActions +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.io.stream.Writeable +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.ToXContentObject +import org.opensearch.common.xcontent.XContentBuilder +import java.io.IOException + +/** + * Action Request to send sql query. + */ +class TransportSQLQueryRequest : ActionRequest, ToXContentObject { + val query: String + val threadContext: String? + + companion object { + /** + * reader to create instance of class from writable. + */ + val reader = Writeable.Reader { TransportSQLQueryRequest(it) } + } + + /** + * constructor for creating the class + * @param query the sql query string + * @param threadContext the user info thread context + */ + constructor( + query: String, + threadContext: String? + ) { + this.query = query + this.threadContext = threadContext + } + + /** + * {@inheritDoc} + */ + @Throws(IOException::class) + constructor(input: StreamInput) : super(input) { + query = input.readString() + threadContext = input.readOptionalString() + } + + @Throws(IOException::class) + override fun writeTo(output: StreamOutput) { + super.writeTo(output) + output.writeString(query) + output.writeOptionalString(threadContext) + } + + /** + * {@inheritDoc} + */ + override fun toXContent(builder: XContentBuilder?, params: ToXContent.Params?): XContentBuilder { + throw IllegalStateException("Transport sql request is not intended for REST or persistence and does not support XContent.") + } + + /** + * {@inheritDoc} + */ + override fun validate(): ActionRequestValidationException? { + var validationException: ActionRequestValidationException? = null + if (query.isEmpty()) { + validationException = ValidateActions.addValidationError("query is empty", validationException) + } + return validationException + } +} diff --git a/src/main/kotlin/org/opensearch/commons/sql/action/TransportSQLQueryResponse.kt b/src/main/kotlin/org/opensearch/commons/sql/action/TransportSQLQueryResponse.kt new file mode 100644 index 00000000..e3cb0321 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/sql/action/TransportSQLQueryResponse.kt @@ -0,0 +1,57 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.sql.action + +import org.opensearch.common.io.stream.StreamInput +import org.opensearch.common.io.stream.StreamOutput +import org.opensearch.common.io.stream.Writeable +import org.opensearch.common.xcontent.ToXContent +import org.opensearch.common.xcontent.XContentBuilder +import java.io.IOException + +/** + * Action Response for sql query. + */ +class TransportSQLQueryResponse : BaseResponse { + val queryResponse: String + + companion object { + /** + * reader to create instance of class from writable. + */ + val reader = Writeable.Reader { TransportSQLQueryResponse(it) } + } + + /** + * constructor for creating the class + * @param queryResponse the sql query response + */ + constructor( + queryResponse: String, + ) { + this.queryResponse = queryResponse + } + + /** + * {@inheritDoc} + */ + @Throws(IOException::class) + constructor(input: StreamInput) : super(input) { + queryResponse = input.readString() + } + + @Throws(IOException::class) + override fun writeTo(output: StreamOutput) { + output.writeString(queryResponse) + } + + /** + * {@inheritDoc} + */ + override fun toXContent(builder: XContentBuilder?, params: ToXContent.Params?): XContentBuilder { + throw IllegalStateException("Transport sql response is not intended for REST or persistence and does not support XContent.") + } +} diff --git a/src/main/kotlin/org/opensearch/commons/sql/model/QueryType.kt b/src/main/kotlin/org/opensearch/commons/sql/model/QueryType.kt new file mode 100644 index 00000000..27944187 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/sql/model/QueryType.kt @@ -0,0 +1,43 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.sql.model + +import org.opensearch.commons.utils.EnumParser + +/** + * Enum for Notification config type + */ +enum class QueryType(val tag: String) { + NONE("none") { + override fun toString(): String { + return tag + } + }, + SQL("ppl") { + override fun toString(): String { + return tag + } + }, + PPL("sql") { + override fun toString(): String { + return tag + } + }; + companion object { + private val tagMap = values().associateBy { it.tag } + + val enumParser = EnumParser { fromTagOrDefault(it) } + + /** + * Get QueryType from tag or NONE if not found + * @param tag the tag + * @return QueryType corresponding to tag. NONE if invalid tag. + */ + fun fromTagOrDefault(tag: String): QueryType { + return tagMap[tag] ?: NONE + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/utils/TransportHelpers.kt b/src/main/kotlin/org/opensearch/commons/utils/TransportHelpers.kt index 049dabdc..7ed91f96 100644 --- a/src/main/kotlin/org/opensearch/commons/utils/TransportHelpers.kt +++ b/src/main/kotlin/org/opensearch/commons/utils/TransportHelpers.kt @@ -26,7 +26,7 @@ val STRING_WRITER = Writeable.Writer { output: StreamOutput, value: String -> * This method needs to be inline and reified so that when this is called from * doExecute() of transport action, the object may be created from other JVM. */ -inline fun recreateObject(writeable: Writeable, block: (StreamInput) -> Request): Request { +inline fun recreateObject(writeable: Writeable, block: (StreamInput) -> Request): Request { ByteArrayOutputStream().use { byteArrayOutputStream -> OutputStreamStreamOutput(byteArrayOutputStream).use { writeable.writeTo(it) diff --git a/src/test/kotlin/org/opensearch/commons/sql/SQLPluginInterfaceTest.kt b/src/test/kotlin/org/opensearch/commons/sql/SQLPluginInterfaceTest.kt new file mode 100644 index 00000000..ceb7a1e3 --- /dev/null +++ b/src/test/kotlin/org/opensearch/commons/sql/SQLPluginInterfaceTest.kt @@ -0,0 +1,59 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.sql + +import com.nhaarman.mockitokotlin2.whenever +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import org.mockito.Answers +import org.mockito.ArgumentMatchers +import org.mockito.Mock +import org.mockito.Mockito +import org.mockito.junit.jupiter.MockitoExtension +import org.opensearch.action.ActionListener +import org.opensearch.action.ActionType +import org.opensearch.client.node.NodeClient +import org.opensearch.commons.sql.action.TransportPPLQueryResponse +import org.opensearch.commons.sql.action.TransportSQLQueryResponse + +@Suppress("UNCHECKED_CAST") +@ExtendWith(MockitoExtension::class) +internal class SQLPluginInterfaceTest { + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + private lateinit var client: NodeClient + + @Test + fun sendSQLQuery() { + val query = "SELECT * FROM account;" + val response = TransportSQLQueryResponse("sample response") + val listener: ActionListener = + Mockito.mock(ActionListener::class.java) as ActionListener + + Mockito.doAnswer { + (it.getArgument(2) as ActionListener) + .onResponse(response) + }.whenever(client).execute(Mockito.any(ActionType::class.java), Mockito.any(), Mockito.any()) + + SQLPluginInterface.sendSQLQuery(client, query, listener) + Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response)) + } + + @Test + fun sendPPLQuery() { + val query = "search source=accounts" + val response = TransportPPLQueryResponse("sample response") + val listener: ActionListener = + Mockito.mock(ActionListener::class.java) as ActionListener + + Mockito.doAnswer { + (it.getArgument(2) as ActionListener) + .onResponse(response) + }.whenever(client).execute(Mockito.any(ActionType::class.java), Mockito.any(), Mockito.any()) + + SQLPluginInterface.sendPPLQuery(client, query, listener) + Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response)) + } +} diff --git a/src/test/kotlin/org/opensearch/commons/sql/action/TransportPPLQueryRequestTest.kt b/src/test/kotlin/org/opensearch/commons/sql/action/TransportPPLQueryRequestTest.kt new file mode 100644 index 00000000..1ce6f18d --- /dev/null +++ b/src/test/kotlin/org/opensearch/commons/sql/action/TransportPPLQueryRequestTest.kt @@ -0,0 +1,40 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.sql.action + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Assertions.assertNull +import org.junit.jupiter.api.Test +import org.opensearch.commons.utils.recreateObject + +internal class TransportPPLQueryRequestTest { + + private fun assertGetRequestEquals( + expected: TransportPPLQueryRequest, + actual: TransportPPLQueryRequest + ) { + assertEquals(expected.query, actual.query) + assertEquals(expected.threadContext, actual.threadContext) + assertNull(actual.validate()) + } + + @Test + fun `Send request serialize and deserialize transport object should be equal PPL`() { + val query = "search source=accounts" + val request = TransportPPLQueryRequest(query, "sample-thread-context") + val recreatedObject = recreateObject(request) { TransportPPLQueryRequest(it) } + assertGetRequestEquals(request, recreatedObject) + } + + @Test + fun `Send query request validate return exception if query is empty`() { + val query = "" + val request = TransportPPLQueryRequest(query, "sample-thread-context") + val recreatedObject = recreateObject(request) { TransportPPLQueryRequest(it) } + assertNotNull(recreatedObject.validate()) + } +} diff --git a/src/test/kotlin/org/opensearch/commons/sql/action/TransportPPLQueryResponseTest.kt b/src/test/kotlin/org/opensearch/commons/sql/action/TransportPPLQueryResponseTest.kt new file mode 100644 index 00000000..211847c2 --- /dev/null +++ b/src/test/kotlin/org/opensearch/commons/sql/action/TransportPPLQueryResponseTest.kt @@ -0,0 +1,19 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.sql.action + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.opensearch.commons.utils.recreateObject + +internal class TransportPPLQueryResponseTest { + @Test + fun `Create response serialize and deserialize transport object should be equal`() { + val queryResponse = TransportPPLQueryResponse("response string") + val recreatedObject = recreateObject(queryResponse) { TransportPPLQueryResponse(it) } + assertEquals(queryResponse.queryResponse, recreatedObject.queryResponse) + } +} diff --git a/src/test/kotlin/org/opensearch/commons/sql/action/TransportSQLQueryRequestTest.kt b/src/test/kotlin/org/opensearch/commons/sql/action/TransportSQLQueryRequestTest.kt new file mode 100644 index 00000000..3308f4ab --- /dev/null +++ b/src/test/kotlin/org/opensearch/commons/sql/action/TransportSQLQueryRequestTest.kt @@ -0,0 +1,40 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.sql.action + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Assertions.assertNull +import org.junit.jupiter.api.Test +import org.opensearch.commons.utils.recreateObject + +internal class TransportSQLQueryRequestTest { + + private fun assertGetRequestEquals( + expected: TransportSQLQueryRequest, + actual: TransportSQLQueryRequest + ) { + assertEquals(expected.query, actual.query) + assertEquals(expected.threadContext, actual.threadContext) + assertNull(actual.validate()) + } + + @Test + fun `Send request serialize and deserialize transport object should be equal SQL`() { + val query = "SELECT * FROM account;" + val request = TransportSQLQueryRequest(query, "sample-thread-context") + val recreatedObject = recreateObject(request) { TransportSQLQueryRequest(it) } + assertGetRequestEquals(request, recreatedObject) + } + + @Test + fun `Send query request validate return exception if query is empty`() { + val query = "" + val request = TransportSQLQueryRequest(query, "sample-thread-context") + val recreatedObject = recreateObject(request) { TransportSQLQueryRequest(it) } + assertNotNull(recreatedObject.validate()) + } +} diff --git a/src/test/kotlin/org/opensearch/commons/sql/action/TransportSQLQueryResponseTest.kt b/src/test/kotlin/org/opensearch/commons/sql/action/TransportSQLQueryResponseTest.kt new file mode 100644 index 00000000..3a7b3eae --- /dev/null +++ b/src/test/kotlin/org/opensearch/commons/sql/action/TransportSQLQueryResponseTest.kt @@ -0,0 +1,19 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.sql.action + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.opensearch.commons.utils.recreateObject + +internal class TransportSQLQueryResponseTest { + @Test + fun `Create response serialize and deserialize transport object should be equal`() { + val queryResponse = TransportSQLQueryResponse("response string") + val recreatedObject = recreateObject(queryResponse) { TransportSQLQueryResponse(it) } + assertEquals(queryResponse.queryResponse, recreatedObject.queryResponse) + } +}