Skip to content

Commit

Permalink
Added transport action support for sending notification API
Browse files Browse the repository at this point in the history
  • Loading branch information
akbhatta committed Nov 9, 2020
1 parent e52394d commit 6911c78
Show file tree
Hide file tree
Showing 25 changed files with 705 additions and 339 deletions.
9 changes: 9 additions & 0 deletions .idea/codeStyles/Project.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@

package com.amazon.opendistroforelasticsearch.notifications

import com.amazon.opendistroforelasticsearch.notifications.resthandler.SendRestHandler
import com.amazon.opendistroforelasticsearch.notifications.action.SendMessageAction
import com.amazon.opendistroforelasticsearch.notifications.resthandler.SendMessageRestHandler
import com.amazon.opendistroforelasticsearch.notifications.settings.PluginSettings
import com.amazon.opendistroforelasticsearch.notifications.throttle.Accountant
import com.amazon.opendistroforelasticsearch.notifications.util.logger
Expand Down Expand Up @@ -97,7 +98,9 @@ internal class NotificationPlugin : ActionPlugin, Plugin() {
*/
override fun getActions(): List<ActionPlugin.ActionHandler<out ActionRequest, out ActionResponse>> {
log.debug("$LOG_PREFIX:getActions")
return listOf()
return listOf(
ActionPlugin.ActionHandler(SendMessageAction.ACTION_TYPE, SendMessageAction::class.java)
)
}

/**
Expand All @@ -114,7 +117,7 @@ internal class NotificationPlugin : ActionPlugin, Plugin() {
): List<RestHandler> {
log.debug("$LOG_PREFIX:getRestHandlers")
return listOf(
SendRestHandler()
SendMessageRestHandler()
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*/

package com.amazon.opendistroforelasticsearch.notifications.action

import com.amazon.opendistroforelasticsearch.notifications.NotificationPlugin.Companion.LOG_PREFIX
import com.amazon.opendistroforelasticsearch.notifications.util.logger
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.elasticsearch.ElasticsearchSecurityException
import org.elasticsearch.ElasticsearchStatusException
import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.ActionRequest
import org.elasticsearch.action.ActionResponse
import org.elasticsearch.action.support.ActionFilters
import org.elasticsearch.action.support.HandledTransportAction
import org.elasticsearch.common.io.stream.Writeable
import org.elasticsearch.index.IndexNotFoundException
import org.elasticsearch.index.engine.VersionConflictEngineException
import org.elasticsearch.indices.InvalidIndexNameException
import org.elasticsearch.rest.RestStatus
import org.elasticsearch.tasks.Task
import org.elasticsearch.transport.TransportService
import java.io.IOException

internal abstract class PluginBaseAction<Request : ActionRequest, Response : ActionResponse>(
name: String,
transportService: TransportService,
actionFilters: ActionFilters,
requestReader: Writeable.Reader<Request>
) : HandledTransportAction<Request, Response>(name, transportService, actionFilters, requestReader) {
companion object {
private val log by logger(PluginBaseAction::class.java)
private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)
}

/**
* {@inheritDoc}
*/
@Suppress("TooGenericExceptionCaught")
override fun doExecute(
task: Task?,
request: Request,
listener: ActionListener<Response>
) {
scope.launch {
try {
listener.onResponse(executeRequest(request))
} catch (exception: ElasticsearchStatusException) {
log.warn("$LOG_PREFIX:ElasticsearchStatusException:", exception)
listener.onFailure(exception)
} catch (exception: ElasticsearchSecurityException) {
log.warn("$LOG_PREFIX:ElasticsearchSecurityException:", exception)
listener.onFailure(ElasticsearchStatusException("Permissions denied: ${exception.message} - Contact administrator",
RestStatus.FORBIDDEN))
} catch (exception: VersionConflictEngineException) {
log.warn("$LOG_PREFIX:VersionConflictEngineException:", exception)
listener.onFailure(ElasticsearchStatusException(exception.message, RestStatus.CONFLICT))
} catch (exception: IndexNotFoundException) {
log.warn("$LOG_PREFIX:IndexNotFoundException:", exception)
listener.onFailure(ElasticsearchStatusException(exception.message, RestStatus.NOT_FOUND))
} catch (exception: InvalidIndexNameException) {
log.warn("$LOG_PREFIX:InvalidIndexNameException:", exception)
listener.onFailure(ElasticsearchStatusException(exception.message, RestStatus.BAD_REQUEST))
} catch (exception: IllegalArgumentException) {
log.warn("$LOG_PREFIX:IllegalArgumentException:", exception)
listener.onFailure(ElasticsearchStatusException(exception.message, RestStatus.BAD_REQUEST))
} catch (exception: IllegalStateException) {
log.warn("$LOG_PREFIX:IllegalStateException:", exception)
listener.onFailure(ElasticsearchStatusException(exception.message, RestStatus.SERVICE_UNAVAILABLE))
} catch (exception: IOException) {
log.error("$LOG_PREFIX:Uncaught IOException:", exception)
listener.onFailure(ElasticsearchStatusException(exception.message, RestStatus.FAILED_DEPENDENCY))
} catch (exception: Exception) {
log.error("$LOG_PREFIX:Uncaught Exception:", exception)
listener.onFailure(ElasticsearchStatusException(exception.message, RestStatus.INTERNAL_SERVER_ERROR))
}
}
}

/**
* Execute the transport request
* @param request the request to execute
* @return the response to return.
*/
abstract fun executeRequest(request: Request): Response
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*/

package com.amazon.opendistroforelasticsearch.notifications.action

import com.amazon.opendistroforelasticsearch.notifications.NotificationPlugin.Companion.LOG_PREFIX
import com.amazon.opendistroforelasticsearch.notifications.channel.ChannelFactory
import com.amazon.opendistroforelasticsearch.notifications.model.ChannelMessageResponse
import com.amazon.opendistroforelasticsearch.notifications.model.SendMessageRequest
import com.amazon.opendistroforelasticsearch.notifications.model.SendMessageResponse
import com.amazon.opendistroforelasticsearch.notifications.throttle.Accountant
import com.amazon.opendistroforelasticsearch.notifications.throttle.Counters
import com.amazon.opendistroforelasticsearch.notifications.util.logger
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.runBlocking
import org.elasticsearch.ElasticsearchStatusException
import org.elasticsearch.action.ActionType
import org.elasticsearch.action.support.ActionFilters
import org.elasticsearch.client.Client
import org.elasticsearch.common.inject.Inject
import org.elasticsearch.common.xcontent.NamedXContentRegistry
import org.elasticsearch.rest.RestStatus
import org.elasticsearch.transport.TransportService

/**
* Send message action for send notification request.
*/
internal class SendMessageAction @Inject constructor(
transportService: TransportService,
val client: Client,
actionFilters: ActionFilters,
val xContentRegistry: NamedXContentRegistry
) : PluginBaseAction<SendMessageRequest, SendMessageResponse>(NAME,
transportService,
actionFilters,
::SendMessageRequest) {
companion object {
private const val NAME = "cluster:admin/opendistro/notifications/send"
internal val ACTION_TYPE = ActionType(NAME, ::SendMessageResponse)
private val log by logger(SendMessageAction::class.java)
}

/**
* {@inheritDoc}
*/
override fun executeRequest(request: SendMessageRequest): SendMessageResponse {
log.debug("$LOG_PREFIX:send")
if (!isMessageQuotaAvailable(request)) {
log.info("$LOG_PREFIX:${request.refTag}:Message Sending quota not available")
throw ElasticsearchStatusException("Message Sending quota not available", RestStatus.TOO_MANY_REQUESTS)
}
val statusList: List<ChannelMessageResponse> = sendMessagesInParallel(request)
statusList.forEach {
log.info("$LOG_PREFIX:${request.refTag}:statusCode=${it.statusCode}, statusText=${it.statusText}")
}
return SendMessageResponse(request.refTag, statusList)
}

private fun sendMessagesInParallel(sendMessageRequest: SendMessageRequest): List<ChannelMessageResponse> {
val counters = Counters()
counters.requestCount.incrementAndGet()
val statusList: List<ChannelMessageResponse>
// Fire all the message sending in parallel
runBlocking {
val statusDeferredList = sendMessageRequest.recipients.map {
async(Dispatchers.IO) { sendMessageToChannel(it, sendMessageRequest, counters) }
}
statusList = statusDeferredList.awaitAll()
}
// After all operation are executed, update the counters
Accountant.incrementCounters(counters)
return statusList
}

private fun sendMessageToChannel(recipient: String, sendMessageRequest: SendMessageRequest, counters: Counters): ChannelMessageResponse {
val channel = ChannelFactory.getNotificationChannel(recipient)
return channel.sendMessage(sendMessageRequest.refTag, recipient, sendMessageRequest.channelMessage, counters)
}

private fun isMessageQuotaAvailable(sendMessageRequest: SendMessageRequest): Boolean {
val counters = Counters()
sendMessageRequest.recipients.forEach {
ChannelFactory.getNotificationChannel(it).updateCounter(sendMessageRequest.refTag, it, sendMessageRequest.channelMessage, counters)
}
return Accountant.isMessageQuotaAvailable(counters)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package com.amazon.opendistroforelasticsearch.notifications.channel

import com.amazon.opendistroforelasticsearch.notifications.core.ChannelMessage
import com.amazon.opendistroforelasticsearch.notifications.core.ChannelMessageResponse
import com.amazon.opendistroforelasticsearch.notifications.model.ChannelMessage
import com.amazon.opendistroforelasticsearch.notifications.model.ChannelMessageResponse
import com.amazon.opendistroforelasticsearch.notifications.throttle.Counters
import org.elasticsearch.rest.RestStatus

Expand All @@ -29,7 +29,8 @@ internal object EmptyChannel : NotificationChannel {
* {@inheritDoc}
*/
override fun sendMessage(refTag: String, recipient: String, channelMessage: ChannelMessage, counter: Counters): ChannelMessageResponse {
return ChannelMessageResponse(RestStatus.UNPROCESSABLE_ENTITY,
return ChannelMessageResponse(recipient,
RestStatus.UNPROCESSABLE_ENTITY,
"No Configured Channel for recipient type:${recipient.substringBefore(':', "empty")}")
}

Expand Down
Loading

0 comments on commit 6911c78

Please sign in to comment.