Skip to content

Commit

Permalink
add SQL/PPL transport request/response models and send SQL/PPL interf…
Browse files Browse the repository at this point in the history
…aces

Signed-off-by: Zhongnan Su <[email protected]>
  • Loading branch information
zhongnansu committed May 12, 2022
1 parent a80b8b1 commit 9f99617
Show file tree
Hide file tree
Showing 14 changed files with 644 additions and 1 deletion.
91 changes: 91 additions & 0 deletions src/main/kotlin/org/opensearch/commons/sql/SQLPluginInterface.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.commons.sql

import org.opensearch.action.ActionListener
import org.opensearch.action.ActionResponse
import org.opensearch.client.node.NodeClient
import org.opensearch.common.io.stream.Writeable
import org.opensearch.commons.ConfigConstants
import org.opensearch.commons.sql.action.BaseResponse
import org.opensearch.commons.sql.action.SQLActions
import org.opensearch.commons.sql.action.TransportPPLQueryRequest
import org.opensearch.commons.sql.action.TransportPPLQueryResponse
import org.opensearch.commons.sql.action.TransportSQLQueryRequest
import org.opensearch.commons.sql.action.TransportSQLQueryResponse
import org.opensearch.commons.utils.SecureClientWrapper
import org.opensearch.commons.utils.recreateObject

/**
* All the transport action plugin interfaces for the SQL plugin
*/
object SQLPluginInterface {

/**
* Send SQL API enabled for a feature. No REST API. Internal API only for Inter plugin communication.
* @param client Node client for making transport action
* @param query The query string
* @param listener The listener for getting response
*/
fun sendSQLQuery(
client: NodeClient,
query: String,
listener: ActionListener<TransportSQLQueryResponse>
) {
val threadContext: String? =
client.threadPool().threadContext.getTransient<String>(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT)
val wrapper = SecureClientWrapper(client) // Executing request in privileged mode
wrapper.execute(
SQLActions.SEND_SQL_QUERY_ACTION_TYPE,
TransportSQLQueryRequest(query, threadContext),
wrapActionListener(listener) { response -> recreateObject(response) { TransportSQLQueryResponse(it) } }
)
}

/**
* Send PPL API enabled for a feature. No REST API. Internal API only for Inter plugin communication.
* @param client Node client for making transport action
* @param query The query string
* @param listener The listener for getting response
*/
fun sendPPLQuery(
client: NodeClient,
query: String,
listener: ActionListener<TransportPPLQueryResponse>
) {
val threadContext: String? =
client.threadPool().threadContext.getTransient<String>(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT)
val wrapper = SecureClientWrapper(client) // Executing request in privileged mode
wrapper.execute(
SQLActions.SEND_PPL_QUERY_ACTION_TYPE,
TransportPPLQueryRequest(query, threadContext),
wrapActionListener(listener) { response -> recreateObject(response) { TransportPPLQueryResponse(it) } }
)
}

/**
* Wrap action listener on concrete response class by a new created one on ActionResponse.
* This is required because the response may be loaded by different classloader across plugins.
* The onResponse(ActionResponse) avoids type cast exception and give a chance to recreate
* the response object.
*/
@Suppress("UNCHECKED_CAST")
private fun <Response : BaseResponse> wrapActionListener(
listener: ActionListener<Response>,
recreate: (Writeable) -> Response
): ActionListener<Response> {
return object : ActionListener<ActionResponse> {
override fun onResponse(response: ActionResponse) {
val recreated = response as? Response ?: recreate(response)
listener.onResponse(recreated)
}

override fun onFailure(exception: java.lang.Exception) {
listener.onFailure(exception)
}
} as ActionListener<Response>
}
}
32 changes: 32 additions & 0 deletions src/main/kotlin/org/opensearch/commons/sql/action/BaseResponse.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.opensearch.commons.sql.action

import org.opensearch.action.ActionResponse
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.xcontent.ToXContentObject
import org.opensearch.rest.RestStatus
import java.io.IOException

/**
* Base response which give REST status.
*/
abstract class BaseResponse : ActionResponse, ToXContentObject {

/**
* constructor for creating the class
*/
constructor()

/**
* {@inheritDoc}
*/
@Throws(IOException::class)
constructor(input: StreamInput) : super(input)

/**
* get rest status for the response. Useful override for multi-status response.
* @return RestStatus for the response
*/
open fun getStatus(): RestStatus {
return RestStatus.OK
}
}
28 changes: 28 additions & 0 deletions src/main/kotlin/org/opensearch/commons/sql/action/SQLActions.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.opensearch.commons.sql.action

import org.opensearch.action.ActionType

object SQLActions {

/**
* Send SQL query. Internal only - Inter plugin communication.
*/
const val SEND_SQL_QUERY_NAME = "cluster:admin/opensearch/sql"

/**
* Send PPL query. Internal only - Inter plugin communication.
*/
const val SEND_PPL_QUERY_NAME = "cluster:admin/opensearch/ppl"

/**
* Send SQL query transport action type.
*/
val SEND_SQL_QUERY_ACTION_TYPE =
ActionType(SEND_SQL_QUERY_NAME, ::TransportSQLQueryResponse)

/**
* Send PPL query transport action type.
*/
val SEND_PPL_QUERY_ACTION_TYPE =
ActionType(SEND_PPL_QUERY_NAME, ::TransportPPLQueryResponse)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.commons.sql.action

import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.action.ValidateActions
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.io.stream.Writeable
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.ToXContentObject
import org.opensearch.common.xcontent.XContentBuilder
import java.io.IOException

/**
* Action Request to send ppl query.
*/
class TransportPPLQueryRequest : ActionRequest, ToXContentObject {
val query: String
val threadContext: String?

companion object {
/**
* reader to create instance of class from writable.
*/
val reader = Writeable.Reader { TransportPPLQueryRequest(it) }
}

/**
* constructor for creating the class
* @param query the ppl query string
* @param threadContext the user info thread context
*/
constructor(
query: String,
threadContext: String?
) {
this.query = query
this.threadContext = threadContext
}

/**
* {@inheritDoc}
*/
@Throws(IOException::class)
constructor(input: StreamInput) : super(input) {
query = input.readString()
threadContext = input.readOptionalString()
}

@Throws(IOException::class)
override fun writeTo(output: StreamOutput) {
super.writeTo(output)
output.writeString(query)
output.writeOptionalString(threadContext)
}

/**
* {@inheritDoc}
*/
override fun toXContent(builder: XContentBuilder?, params: ToXContent.Params?): XContentBuilder {
throw IllegalStateException("Transport ppl request is not intended for REST or persistence and does not support XContent.")
}

/**
* {@inheritDoc}
*/
override fun validate(): ActionRequestValidationException? {
var validationException: ActionRequestValidationException? = null
if (query.isEmpty()) {
validationException = ValidateActions.addValidationError("query is empty", validationException)
}
return validationException
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.commons.sql.action

import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.io.stream.Writeable
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentBuilder
import java.io.IOException

/**
* Action Response for ppl query.
*/
class TransportPPLQueryResponse : BaseResponse {
val queryResponse: String

companion object {
/**
* reader to create instance of class from writable.
*/
val reader = Writeable.Reader { TransportPPLQueryResponse(it) }
}

/**
* constructor for creating the class
* @param queryResponse the ppl query response
*/
constructor(
queryResponse: String,
) {
this.queryResponse = queryResponse
}

/**
* {@inheritDoc}
*/
@Throws(IOException::class)
constructor(input: StreamInput) : super(input) {
queryResponse = input.readString()
}

@Throws(IOException::class)
override fun writeTo(output: StreamOutput) {
output.writeString(queryResponse)
}

/**
* {@inheritDoc}
*/
override fun toXContent(builder: XContentBuilder?, params: ToXContent.Params?): XContentBuilder {
throw IllegalStateException("Transport ppl response is not intended for REST or persistence and does not support XContent.")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.commons.sql.action

import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.action.ValidateActions
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.io.stream.Writeable
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.ToXContentObject
import org.opensearch.common.xcontent.XContentBuilder
import java.io.IOException

/**
* Action Request to send sql query.
*/
class TransportSQLQueryRequest : ActionRequest, ToXContentObject {
val query: String
val threadContext: String?

companion object {
/**
* reader to create instance of class from writable.
*/
val reader = Writeable.Reader { TransportSQLQueryRequest(it) }
}

/**
* constructor for creating the class
* @param query the sql query string
* @param threadContext the user info thread context
*/
constructor(
query: String,
threadContext: String?
) {
this.query = query
this.threadContext = threadContext
}

/**
* {@inheritDoc}
*/
@Throws(IOException::class)
constructor(input: StreamInput) : super(input) {
query = input.readString()
threadContext = input.readOptionalString()
}

@Throws(IOException::class)
override fun writeTo(output: StreamOutput) {
super.writeTo(output)
output.writeString(query)
output.writeOptionalString(threadContext)
}

/**
* {@inheritDoc}
*/
override fun toXContent(builder: XContentBuilder?, params: ToXContent.Params?): XContentBuilder {
throw IllegalStateException("Transport sql request is not intended for REST or persistence and does not support XContent.")
}

/**
* {@inheritDoc}
*/
override fun validate(): ActionRequestValidationException? {
var validationException: ActionRequestValidationException? = null
if (query.isEmpty()) {
validationException = ValidateActions.addValidationError("query is empty", validationException)
}
return validationException
}
}
Loading

0 comments on commit 9f99617

Please sign in to comment.