Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add SQL/PPL transport request/response models #155

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.commons.sql

import org.opensearch.action.ActionListener
import org.opensearch.action.ActionResponse
import org.opensearch.client.node.NodeClient
import org.opensearch.common.io.stream.Writeable
import org.opensearch.commons.ConfigConstants
import org.opensearch.commons.sql.action.BaseResponse
import org.opensearch.commons.sql.action.SQLActions
import org.opensearch.commons.sql.action.TransportPPLQueryRequest
import org.opensearch.commons.sql.action.TransportPPLQueryResponse
import org.opensearch.commons.sql.action.TransportSQLQueryRequest
import org.opensearch.commons.sql.action.TransportSQLQueryResponse
import org.opensearch.commons.utils.SecureClientWrapper
import org.opensearch.commons.utils.recreateObject

/**
* All the transport action plugin interfaces for the SQL plugin
*/
object SQLPluginInterface {

/**
* Send SQL API enabled for a feature. No REST API. Internal API only for Inter plugin communication.
* @param client Node client for making transport action
* @param query The query string
* @param listener The listener for getting response
*/
fun sendSQLQuery(
client: NodeClient,
query: String,
listener: ActionListener<TransportSQLQueryResponse>
) {
val threadContext: String? =
client.threadPool().threadContext.getTransient<String>(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT)
val wrapper = SecureClientWrapper(client) // Executing request in privileged mode
wrapper.execute(
SQLActions.SEND_SQL_QUERY_ACTION_TYPE,
TransportSQLQueryRequest(query, threadContext),
wrapActionListener(listener) { response -> recreateObject(response) { TransportSQLQueryResponse(it) } }
)
}

/**
* Send PPL API enabled for a feature. No REST API. Internal API only for Inter plugin communication.
* @param client Node client for making transport action
* @param query The query string
* @param listener The listener for getting response
*/
fun sendPPLQuery(
client: NodeClient,
request: TransportPPLQueryRequest,
listener: ActionListener<TransportPPLQueryResponse>
) {
// val threadContext: String? =
// client.threadPool().threadContext.getTransient<String>(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT)
// val wrapper = SecureClientWrapper(client) // Executing request in privileged mode
client.execute(
SQLActions.SEND_PPL_QUERY_ACTION_TYPE,
request,
wrapActionListener(listener) { response -> recreateObject(response) { TransportPPLQueryResponse(it) } }
)
}

/**
* Wrap action listener on concrete response class by a new created one on ActionResponse.
* This is required because the response may be loaded by different classloader across plugins.
* The onResponse(ActionResponse) avoids type cast exception and give a chance to recreate
* the response object.
*/
@Suppress("UNCHECKED_CAST")
private fun <Response : BaseResponse> wrapActionListener(
listener: ActionListener<Response>,
recreate: (Writeable) -> Response
): ActionListener<Response> {
return object : ActionListener<ActionResponse> {
override fun onResponse(response: ActionResponse) {
val recreated = response as? Response ?: recreate(response)
listener.onResponse(recreated)
}

override fun onFailure(exception: java.lang.Exception) {
listener.onFailure(exception)
}
} as ActionListener<Response>
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.opensearch.commons.sql.action

import org.opensearch.action.ActionResponse
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.xcontent.ToXContentObject
import org.opensearch.rest.RestStatus
import java.io.IOException

/**
* Base response which give REST status.
*/
abstract class BaseResponse : ActionResponse, ToXContentObject {

/**
* constructor for creating the class
*/
constructor()

/**
* {@inheritDoc}
*/
@Throws(IOException::class)
constructor(input: StreamInput) : super(input)

/**
* get rest status for the response. Useful override for multi-status response.
* @return RestStatus for the response
*/
open fun getStatus(): RestStatus {
return RestStatus.OK
}
}
28 changes: 28 additions & 0 deletions src/main/kotlin/org/opensearch/commons/sql/action/SQLActions.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.opensearch.commons.sql.action

import org.opensearch.action.ActionType

object SQLActions {
zhongnansu marked this conversation as resolved.
Show resolved Hide resolved

/**
* Send SQL query. Internal only - Inter plugin communication.
*/
const val SEND_SQL_QUERY_NAME = "cluster:admin/opensearch/sql"

/**
* Send PPL query. Internal only - Inter plugin communication.
*/
const val SEND_PPL_QUERY_NAME = "cluster:admin/opensearch/ppl"

/**
* Send SQL query transport action type.
*/
val SEND_SQL_QUERY_ACTION_TYPE =
ActionType(SEND_SQL_QUERY_NAME, ::TransportSQLQueryResponse)

/**
* Send PPL query transport action type.
*/
val SEND_PPL_QUERY_ACTION_TYPE =
ActionType(SEND_PPL_QUERY_NAME, ::TransportPPLQueryResponse)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.commons.sql.action

import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.action.ValidateActions
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.io.stream.Writeable
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.ToXContentObject
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.commons.sql.model.Style
import java.io.IOException

/**
* Action Request to send ppl query.
*/
class TransportPPLQueryRequest : ActionRequest, ToXContentObject {
val pplQuery: String
val path: String
var format: String
var sanitize: Boolean?
var style: Style?
val threadContext: String?

companion object {
/**
* reader to create instance of class from writable.
*/
val reader = Writeable.Reader { TransportPPLQueryRequest(it) }
}

/**
* constructor for creating the class
* @param query the ppl query string
* @param threadContext the user info thread context
*/
constructor(
query: String,
path: String,
format: String = "",
sanitize: Boolean? = true,
style: Style? = Style.COMPACT,
threadContext: String?
) {
this.pplQuery = query
this.path = path
this.format = format
this.sanitize = sanitize
this.style = style
this.threadContext = threadContext
}

/**
* {@inheritDoc}
*/
@Throws(IOException::class)
constructor(input: StreamInput) : super(input) {
pplQuery = input.readString()
path = input.readString()
format = input.readString()
sanitize = input.readOptionalBoolean()
style = input.readEnum(Style::class.java)
threadContext = input.readOptionalString()
}

@Throws(IOException::class)
override fun writeTo(output: StreamOutput) {
super.writeTo(output)
output.writeString(pplQuery)
output.writeString(path)
output.writeOptionalString(format)
output.writeOptionalBoolean(sanitize)
output.writeEnum(style)
output.writeOptionalString(threadContext)
}

/**
* {@inheritDoc}
*/
override fun toXContent(builder: XContentBuilder?, params: ToXContent.Params?): XContentBuilder? {
return null
}

/**
* {@inheritDoc}
*/
override fun validate(): ActionRequestValidationException? {
var validationException: ActionRequestValidationException? = null
if (pplQuery.isEmpty()) {
validationException = ValidateActions.addValidationError("query is empty", validationException)
}
return validationException
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.commons.sql.action

import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.io.stream.Writeable
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentBuilder
import java.io.IOException

/**
* Action Response for ppl query.
*/
class TransportPPLQueryResponse : BaseResponse {
val queryResponse: String

companion object {
/**
* reader to create instance of class from writable.
*/
val reader = Writeable.Reader { TransportPPLQueryResponse(it) }
}

/**
* constructor for creating the class
* @param queryResponse the ppl query response
*/
constructor(
queryResponse: String,
) {
this.queryResponse = queryResponse
}

/**
* {@inheritDoc}
*/
@Throws(IOException::class)
constructor(input: StreamInput) : super(input) {
queryResponse = input.readString()
}

@Throws(IOException::class)
override fun writeTo(output: StreamOutput) {
output.writeString(queryResponse)
}

/**
* {@inheritDoc}
*/
override fun toXContent(builder: XContentBuilder?, params: ToXContent.Params?): XContentBuilder {
// throw IllegalStateException("Transport ppl response is not intended for REST or persistence and does not support XContent.")
builder!!
return builder.startObject()
.field("response", queryResponse)
.endObject()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.commons.sql.action

import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.action.ValidateActions
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.io.stream.Writeable
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.ToXContentObject
import org.opensearch.common.xcontent.XContentBuilder
import java.io.IOException

/**
* Action Request to send sql query.
*/
class TransportSQLQueryRequest : ActionRequest, ToXContentObject {
val query: String
val threadContext: String?

companion object {
/**
* reader to create instance of class from writable.
*/
val reader = Writeable.Reader { TransportSQLQueryRequest(it) }
}

/**
* constructor for creating the class
* @param query the sql query string
* @param threadContext the user info thread context
*/
constructor(
query: String,
threadContext: String?
) {
this.query = query
this.threadContext = threadContext
}

/**
* {@inheritDoc}
*/
@Throws(IOException::class)
constructor(input: StreamInput) : super(input) {
query = input.readString()
threadContext = input.readOptionalString()
}

@Throws(IOException::class)
override fun writeTo(output: StreamOutput) {
super.writeTo(output)
output.writeString(query)
output.writeOptionalString(threadContext)
}

/**
* {@inheritDoc}
*/
override fun toXContent(builder: XContentBuilder?, params: ToXContent.Params?): XContentBuilder {
throw IllegalStateException("Transport sql request is not intended for REST or persistence and does not support XContent.")
}

/**
* {@inheritDoc}
*/
override fun validate(): ActionRequestValidationException? {
var validationException: ActionRequestValidationException? = null
if (query.isEmpty()) {
validationException = ValidateActions.addValidationError("query is empty", validationException)
}
return validationException
}
}
Loading