Skip to content

Commit

Permalink
Integrate with Notifications plugin for Alerting backend (#401)
Browse files Browse the repository at this point in the history
* Add utils for retrieving notifications to both legacy Destinations and Notification Channels

Signed-off-by: Mohammad Qureshi <[email protected]>

* Refactor runAction in MonitorRunner to be able to send notifications to either Notification Channels or Destination

Signed-off-by: Mohammad Qureshi <[email protected]>

* Fix error handling when Notification config is not found and support TEST_ACTION for tests

Signed-off-by: Mohammad Qureshi <[email protected]>

* Fix issue with fallback setting for Destination email keystore settings

Signed-off-by: Mohammad Qureshi <[email protected]>

* Add publishing email Destinations via Notifications passthrough API

Signed-off-by: Mohammad Qureshi <[email protected]>

* Remove unused code in NotificationApiUtils

Signed-off-by: Mohammad Qureshi <[email protected]>

* Use subject as the title for Email Channels

Signed-off-by: Mohammad Qureshi <[email protected]>

* Combine runAction() methods

Signed-off-by: Mohammad Qureshi <[email protected]>

* Pass accountName to LegacyEmailMessage

Signed-off-by: Mohammad Qureshi <[email protected]>

* Split retrieval of Destination configs in DestinationMigrationUtilService

Signed-off-by: Mohammad Qureshi <[email protected]>
  • Loading branch information
qreshi authored Apr 17, 2022
1 parent 396fc6d commit 4d8643c
Show file tree
Hide file tree
Showing 10 changed files with 277 additions and 217 deletions.
111 changes: 69 additions & 42 deletions alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.opensearch.alerting.model.action.ActionExecutionScope
import org.opensearch.alerting.model.action.AlertCategory
import org.opensearch.alerting.model.action.PerAlertActionScope
import org.opensearch.alerting.model.action.PerExecutionActionScope
import org.opensearch.alerting.model.destination.Destination
import org.opensearch.alerting.model.destination.DestinationContextFactory
import org.opensearch.alerting.opensearchapi.InjectorContextElement
import org.opensearch.alerting.opensearchapi.retry
Expand All @@ -54,18 +55,27 @@ import org.opensearch.alerting.settings.DestinationSettings.Companion.ALLOW_LIST
import org.opensearch.alerting.settings.DestinationSettings.Companion.HOST_DENY_LIST
import org.opensearch.alerting.settings.DestinationSettings.Companion.loadDestinationSettings
import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings.Companion.HOST_DENY_LIST_NONE
import org.opensearch.alerting.util.destinationmigration.NotificationActionConfigs
import org.opensearch.alerting.util.destinationmigration.NotificationApiUtils.Companion.getNotificationConfigInfo
import org.opensearch.alerting.util.destinationmigration.createMessageContent
import org.opensearch.alerting.util.destinationmigration.getTitle
import org.opensearch.alerting.util.destinationmigration.publishLegacyNotification
import org.opensearch.alerting.util.destinationmigration.sendNotification
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.alerting.util.getCombinedTriggerRunResult
import org.opensearch.alerting.util.isADMonitor
import org.opensearch.alerting.util.isAllowed
import org.opensearch.alerting.util.isBucketLevelMonitor
import org.opensearch.alerting.util.isTestAction
import org.opensearch.client.Client
import org.opensearch.client.node.NodeClient
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.Strings
import org.opensearch.common.component.AbstractLifecycleComponent
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.commons.notifications.model.NotificationConfigInfo
import org.opensearch.script.Script
import org.opensearch.script.ScriptService
import org.opensearch.script.TemplateScript
Expand Down Expand Up @@ -637,31 +647,22 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
}
}

private suspend fun runAction(action: Action, ctx: QueryLevelTriggerExecutionContext, dryrun: Boolean): ActionRunResult {
private suspend fun runAction(action: Action, ctx: TriggerExecutionContext, dryrun: Boolean): ActionRunResult {
return try {
if (!isActionActionable(action, ctx.alert)) {
if (ctx is QueryLevelTriggerExecutionContext && !isActionActionable(action, ctx.alert)) {
return ActionRunResult(action.id, action.name, mapOf(), true, null, null)
}

val actionOutput = mutableMapOf<String, String>()
actionOutput[SUBJECT] = if (action.subjectTemplate != null) compileTemplate(action.subjectTemplate, ctx) else ""
actionOutput[MESSAGE] = compileTemplate(action.messageTemplate, ctx)
if (Strings.isNullOrEmpty(actionOutput[MESSAGE])) {
throw IllegalStateException("Message content missing in the Destination with id: ${action.destinationId}")
}
if (!dryrun) {
// TODO: Inject user here so only Destination/Notifications that the user has permissions to are retrieved
withContext(Dispatchers.IO) {
val destination = AlertingConfigAccessor.getDestinationInfo(client, xContentRegistry, action.destinationId)
if (!destination.isAllowed(allowList)) {
throw IllegalStateException("Monitor contains a Destination type that is not allowed: ${destination.type}")
}

val destinationCtx = destinationContextFactory.getDestinationContext(destination)
actionOutput[MESSAGE_ID] = destination.publish(
actionOutput[SUBJECT],
actionOutput[MESSAGE]!!,
destinationCtx,
hostDenyList
)
actionOutput[MESSAGE_ID] = getConfigAndSendNotification(action, actionOutput[SUBJECT], actionOutput[MESSAGE]!!)
}
}
ActionRunResult(action.id, action.name, actionOutput, false, currentTime(), null)
Expand All @@ -670,41 +671,67 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
}
}

// TODO: This is largely a duplicate of runAction above for BucketLevelTriggerExecutionContext for now.
// After suppression logic implementation, if this remains mostly the same, it can be refactored.
private suspend fun runAction(action: Action, ctx: BucketLevelTriggerExecutionContext, dryrun: Boolean): ActionRunResult {
return try {
val actionOutput = mutableMapOf<String, String>()
actionOutput[SUBJECT] = if (action.subjectTemplate != null) compileTemplate(action.subjectTemplate, ctx) else ""
actionOutput[MESSAGE] = compileTemplate(action.messageTemplate, ctx)
if (Strings.isNullOrEmpty(actionOutput[MESSAGE])) {
throw IllegalStateException("Message content missing in the Destination with id: ${action.destinationId}")
}
if (!dryrun) {
withContext(Dispatchers.IO) {
val destination = AlertingConfigAccessor.getDestinationInfo(client, xContentRegistry, action.destinationId)
if (!destination.isAllowed(allowList)) {
throw IllegalStateException("Monitor contains a Destination type that is not allowed: ${destination.type}")
}
private suspend fun getConfigAndSendNotification(action: Action, subject: String?, message: String): String {
val config = getConfigForNotificationAction(action)

val destinationCtx = destinationContextFactory.getDestinationContext(destination)
actionOutput[MESSAGE_ID] = destination.publish(
actionOutput[SUBJECT],
actionOutput[MESSAGE]!!,
destinationCtx,
hostDenyList
)
}
}
ActionRunResult(action.id, action.name, actionOutput, false, currentTime(), null)
} catch (e: Exception) {
ActionRunResult(action.id, action.name, mapOf(), false, currentTime(), e)
if (config.destination == null && config.channel == null) {
throw IllegalStateException("Unable to find a Notification Channel or Destination config with id [${action.id}]")
}

// Adding a check on TEST_ACTION Destination type here to avoid supporting it as a LegacyBaseMessage type
// just for Alerting integration tests
if (config.destination?.isTestAction() == true) {
return "test action"
}

if (config.destination?.isAllowed(allowList) == false) {
throw IllegalStateException(
"Monitor contains a Destination type that is not allowed: ${config.destination.type}"
)
}

var actionResponseContent = ""
actionResponseContent = config.channel
?.sendNotification(
client,
config.channel.getTitle(subject),
config.channel.createMessageContent(subject, message)
) ?: actionResponseContent

actionResponseContent = config.destination
?.buildLegacyBaseMessage(subject, message, destinationContextFactory.getDestinationContext(config.destination))
?.publishLegacyNotification(client)
?: actionResponseContent

return actionResponseContent
}

private fun compileTemplate(template: Script, ctx: TriggerExecutionContext): String {
return scriptService.compile(template, TemplateScript.CONTEXT)
.newInstance(template.params + mapOf("ctx" to ctx.asTemplateArg()))
.execute()
}

/**
* The "destination" ID referenced in a Monitor Action could either be a Notification config or a Destination config
* depending on whether the background migration process has already migrated it from a Destination to a Notification config.
*
* To cover both of these cases, the Notification config will take precedence and if it is not found, the Destination will be retrieved.
*/
private suspend fun getConfigForNotificationAction(action: Action): NotificationActionConfigs {
var destination: Destination? = null
val channel: NotificationConfigInfo? = getNotificationConfigInfo(client as NodeClient, action.destinationId)

// If the channel was not found, try to retrieve the Destination
if (channel == null) {
destination = try {
AlertingConfigAccessor.getDestinationInfo(client, xContentRegistry, action.destinationId)
} catch (e: IllegalStateException) {
// Catching the exception thrown when the Destination was not found so the NotificationActionConfigs object can be returned
null
}
}

return NotificationActionConfigs(destination, channel)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,13 @@

package org.opensearch.alerting.model.destination

import org.opensearch.alerting.opensearchapi.string
import org.opensearch.common.Strings
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.common.xcontent.XContentType
import java.io.IOException
import java.lang.IllegalStateException

Expand Down Expand Up @@ -71,12 +68,8 @@ data class Chime(val url: String) : ToXContent {
}
}

fun constructMessageContent(subject: String?, message: String?): String {
val messageContent: String? = if (Strings.isNullOrEmpty(subject)) message else "$subject \n\n $message"
val builder = XContentFactory.contentBuilder(XContentType.JSON)
builder.startObject()
.field("Content", messageContent)
.endObject()
return builder.string()
// Complete JSON structure is now constructed in the notification plugin
fun constructMessageContent(subject: String?, message: String): String {
return if (Strings.isNullOrEmpty(subject)) message else "$subject \n\n $message"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,27 @@
package org.opensearch.alerting.model.destination

import org.apache.logging.log4j.LogManager
import org.opensearch.alerting.destination.Notification
import org.opensearch.alerting.destination.message.BaseMessage
import org.opensearch.alerting.destination.message.ChimeMessage
import org.opensearch.alerting.destination.message.CustomWebhookMessage
import org.opensearch.alerting.destination.message.EmailMessage
import org.opensearch.alerting.destination.message.SlackMessage
import org.opensearch.alerting.model.destination.email.Email
import org.opensearch.alerting.opensearchapi.convertToMap
import org.opensearch.alerting.opensearchapi.instant
import org.opensearch.alerting.opensearchapi.optionalTimeField
import org.opensearch.alerting.opensearchapi.optionalUserField
import org.opensearch.alerting.util.DestinationType
import org.opensearch.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION
import org.opensearch.alerting.util.isHostInDenylist
import org.opensearch.alerting.util.destinationmigration.DestinationConversionUtils.Companion.convertAlertingToNotificationMethodType
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.commons.authuser.User
import org.opensearch.commons.destination.message.LegacyBaseMessage
import org.opensearch.commons.destination.message.LegacyChimeMessage
import org.opensearch.commons.destination.message.LegacyCustomWebhookMessage
import org.opensearch.commons.destination.message.LegacyEmailMessage
import org.opensearch.commons.destination.message.LegacySlackMessage
import java.io.IOException
import java.net.InetAddress
import java.time.Instant
import java.util.Locale

Expand Down Expand Up @@ -238,72 +236,52 @@ data class Destination(
}
}

@Throws(IOException::class)
fun publish(
fun buildLegacyBaseMessage(
compiledSubject: String?,
compiledMessage: String,
destinationCtx: DestinationContext,
denyHostRanges: List<String>
): String {
destinationCtx: DestinationContext
): LegacyBaseMessage {

val destinationMessage: BaseMessage
val responseContent: String
val responseStatusCode: Int
val destinationMessage: LegacyBaseMessage
when (type) {
DestinationType.CHIME -> {
val messageContent = chime?.constructMessageContent(compiledSubject, compiledMessage)
destinationMessage = ChimeMessage.Builder(name)
destinationMessage = LegacyChimeMessage.Builder(name)
.withUrl(chime?.url)
.withMessage(messageContent)
.build()
}
DestinationType.SLACK -> {
val messageContent = slack?.constructMessageContent(compiledSubject, compiledMessage)
destinationMessage = SlackMessage.Builder(name)
destinationMessage = LegacySlackMessage.Builder(name)
.withUrl(slack?.url)
.withMessage(messageContent)
.build()
}
DestinationType.CUSTOM_WEBHOOK -> {
destinationMessage = CustomWebhookMessage.Builder(name)
.withUrl(customWebhook?.url)
.withScheme(customWebhook?.scheme)
.withHost(customWebhook?.host)
.withPort(customWebhook?.port)
.withPath(customWebhook?.path)
.withMethod(customWebhook?.method)
.withQueryParams(customWebhook?.queryParams)
destinationMessage = LegacyCustomWebhookMessage.Builder(name)
.withUrl(getLegacyCustomWebhookMessageURL(customWebhook))
.withHeaderParams(customWebhook?.headerParams)
.withMessage(compiledMessage).build()
}
DestinationType.EMAIL -> {
val emailAccount = destinationCtx.emailAccount
destinationMessage = EmailMessage.Builder(name)
destinationMessage = LegacyEmailMessage.Builder(name)
.withAccountName(emailAccount?.name)
.withHost(emailAccount?.host)
.withPort(emailAccount?.port)
.withMethod(emailAccount?.method?.value)
.withUserName(emailAccount?.username)
.withPassword(emailAccount?.password)
.withMethod(emailAccount?.method?.let { convertAlertingToNotificationMethodType(it).toString() })
.withFrom(emailAccount?.email)
.withRecipients(destinationCtx.recipients)
.withSubject(compiledSubject)
.withMessage(compiledMessage).build()
}
DestinationType.TEST_ACTION -> {
return "test action"
}
else -> throw IllegalArgumentException("Unsupported Destination type [$type] for building legacy message")
}

validateDestinationUri(destinationMessage, denyHostRanges)
val response = Notification.publish(destinationMessage) as org.opensearch.alerting.destination.response.DestinationResponse
responseContent = response.responseContent
responseStatusCode = response.statusCode

logger.info("Message published for action name: $name, messageid: $responseContent, statuscode: $responseStatusCode")
return responseContent
return destinationMessage
}

fun constructResponseForDestinationType(type: DestinationType): Any {
private fun constructResponseForDestinationType(type: DestinationType): Any {
var content: Any? = null
when (type) {
DestinationType.CHIME -> content = chime?.convertToMap()?.get(type.value)
Expand All @@ -318,13 +296,14 @@ data class Destination(
return content
}

private fun validateDestinationUri(destinationMessage: BaseMessage, denyHostRanges: List<String>) {
if (destinationMessage.isHostInDenylist(denyHostRanges)) {
logger.error(
"Host: {} resolves to: {} which is in denylist: {}.", destinationMessage.uri.host,
InetAddress.getByName(destinationMessage.uri.host), denyHostRanges
)
throw IOException("The destination address is invalid.")
}
private fun getLegacyCustomWebhookMessageURL(customWebhook: CustomWebhook?): String {
return LegacyCustomWebhookMessage.Builder(name)
.withUrl(customWebhook?.url)
.withScheme(customWebhook?.scheme)
.withHost(customWebhook?.host)
.withPort(customWebhook?.port)
.withPath(customWebhook?.path)
.withQueryParams(customWebhook?.queryParams)
.build().uri.toString()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,13 @@

package org.opensearch.alerting.model.destination

import org.opensearch.alerting.opensearchapi.string
import org.opensearch.common.Strings
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.common.xcontent.XContentType
import java.io.IOException
import java.lang.IllegalStateException

Expand Down Expand Up @@ -71,12 +68,8 @@ data class Slack(val url: String) : ToXContent {
}
}

// Complete JSON structure is now constructed in the notification plugin
fun constructMessageContent(subject: String?, message: String): String {
val messageContent: String? = if (Strings.isNullOrEmpty(subject)) message else "$subject \n\n $message"
val builder = XContentFactory.contentBuilder(XContentType.JSON)
builder.startObject()
.field("text", messageContent)
.endObject()
return builder.string()
return if (Strings.isNullOrEmpty(subject)) message else "$subject \n\n $message"
}
}
Loading

0 comments on commit 4d8643c

Please sign in to comment.