Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate with Notifications plugin for Alerting backend #401

Merged
merged 10 commits into from
Apr 17, 2022
111 changes: 69 additions & 42 deletions alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.opensearch.alerting.model.action.ActionExecutionScope
import org.opensearch.alerting.model.action.AlertCategory
import org.opensearch.alerting.model.action.PerAlertActionScope
import org.opensearch.alerting.model.action.PerExecutionActionScope
import org.opensearch.alerting.model.destination.Destination
import org.opensearch.alerting.model.destination.DestinationContextFactory
import org.opensearch.alerting.opensearchapi.InjectorContextElement
import org.opensearch.alerting.opensearchapi.retry
Expand All @@ -54,18 +55,27 @@ import org.opensearch.alerting.settings.DestinationSettings.Companion.ALLOW_LIST
import org.opensearch.alerting.settings.DestinationSettings.Companion.HOST_DENY_LIST
import org.opensearch.alerting.settings.DestinationSettings.Companion.loadDestinationSettings
import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings.Companion.HOST_DENY_LIST_NONE
import org.opensearch.alerting.util.destinationmigration.NotificationActionConfigs
import org.opensearch.alerting.util.destinationmigration.NotificationApiUtils.Companion.getNotificationConfigInfo
import org.opensearch.alerting.util.destinationmigration.createMessageContent
import org.opensearch.alerting.util.destinationmigration.getTitle
import org.opensearch.alerting.util.destinationmigration.publishLegacyNotification
import org.opensearch.alerting.util.destinationmigration.sendNotification
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.alerting.util.getCombinedTriggerRunResult
import org.opensearch.alerting.util.isADMonitor
import org.opensearch.alerting.util.isAllowed
import org.opensearch.alerting.util.isBucketLevelMonitor
import org.opensearch.alerting.util.isTestAction
import org.opensearch.client.Client
import org.opensearch.client.node.NodeClient
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.Strings
import org.opensearch.common.component.AbstractLifecycleComponent
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.commons.notifications.model.NotificationConfigInfo
import org.opensearch.script.Script
import org.opensearch.script.ScriptService
import org.opensearch.script.TemplateScript
Expand Down Expand Up @@ -637,31 +647,22 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
}
}

private suspend fun runAction(action: Action, ctx: QueryLevelTriggerExecutionContext, dryrun: Boolean): ActionRunResult {
private suspend fun runAction(action: Action, ctx: TriggerExecutionContext, dryrun: Boolean): ActionRunResult {
return try {
if (!isActionActionable(action, ctx.alert)) {
if (ctx is QueryLevelTriggerExecutionContext && !isActionActionable(action, ctx.alert)) {
return ActionRunResult(action.id, action.name, mapOf(), true, null, null)
}

val actionOutput = mutableMapOf<String, String>()
actionOutput[SUBJECT] = if (action.subjectTemplate != null) compileTemplate(action.subjectTemplate, ctx) else ""
actionOutput[MESSAGE] = compileTemplate(action.messageTemplate, ctx)
if (Strings.isNullOrEmpty(actionOutput[MESSAGE])) {
throw IllegalStateException("Message content missing in the Destination with id: ${action.destinationId}")
}
if (!dryrun) {
// TODO: Inject user here so only Destination/Notifications that the user has permissions to are retrieved
withContext(Dispatchers.IO) {
val destination = AlertingConfigAccessor.getDestinationInfo(client, xContentRegistry, action.destinationId)
if (!destination.isAllowed(allowList)) {
throw IllegalStateException("Monitor contains a Destination type that is not allowed: ${destination.type}")
}

val destinationCtx = destinationContextFactory.getDestinationContext(destination)
actionOutput[MESSAGE_ID] = destination.publish(
actionOutput[SUBJECT],
actionOutput[MESSAGE]!!,
destinationCtx,
hostDenyList
)
actionOutput[MESSAGE_ID] = getConfigAndSendNotification(action, actionOutput[SUBJECT], actionOutput[MESSAGE]!!)
}
}
ActionRunResult(action.id, action.name, actionOutput, false, currentTime(), null)
Expand All @@ -670,41 +671,67 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
}
}

// TODO: This is largely a duplicate of runAction above for BucketLevelTriggerExecutionContext for now.
// After suppression logic implementation, if this remains mostly the same, it can be refactored.
private suspend fun runAction(action: Action, ctx: BucketLevelTriggerExecutionContext, dryrun: Boolean): ActionRunResult {
return try {
val actionOutput = mutableMapOf<String, String>()
actionOutput[SUBJECT] = if (action.subjectTemplate != null) compileTemplate(action.subjectTemplate, ctx) else ""
actionOutput[MESSAGE] = compileTemplate(action.messageTemplate, ctx)
if (Strings.isNullOrEmpty(actionOutput[MESSAGE])) {
throw IllegalStateException("Message content missing in the Destination with id: ${action.destinationId}")
}
if (!dryrun) {
withContext(Dispatchers.IO) {
val destination = AlertingConfigAccessor.getDestinationInfo(client, xContentRegistry, action.destinationId)
if (!destination.isAllowed(allowList)) {
throw IllegalStateException("Monitor contains a Destination type that is not allowed: ${destination.type}")
}
private suspend fun getConfigAndSendNotification(action: Action, subject: String?, message: String): String {
val config = getConfigForNotificationAction(action)

val destinationCtx = destinationContextFactory.getDestinationContext(destination)
actionOutput[MESSAGE_ID] = destination.publish(
actionOutput[SUBJECT],
actionOutput[MESSAGE]!!,
destinationCtx,
hostDenyList
)
}
}
ActionRunResult(action.id, action.name, actionOutput, false, currentTime(), null)
} catch (e: Exception) {
ActionRunResult(action.id, action.name, mapOf(), false, currentTime(), e)
if (config.destination == null && config.channel == null) {
throw IllegalStateException("Unable to find a Notification Channel or Destination config with id [${action.id}]")
}

// Adding a check on TEST_ACTION Destination type here to avoid supporting it as a LegacyBaseMessage type
// just for Alerting integration tests
if (config.destination?.isTestAction() == true) {
return "test action"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we define this constant somewhere else and use it both at the time assert and here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

}

if (config.destination?.isAllowed(allowList) == false) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we rename allowList to destinationsAllowList for better readability.

throw IllegalStateException(
"Monitor contains a Destination type that is not allowed: ${config.destination.type}"
)
}

var actionResponseContent = ""
actionResponseContent = config.channel
?.sendNotification(
client,
config.channel.getTitle(subject),
config.channel.createMessageContent(subject, message)
) ?: actionResponseContent

actionResponseContent = config.destination
?.buildLegacyBaseMessage(subject, message, destinationContextFactory.getDestinationContext(config.destination))
?.publishLegacyNotification(client)
?: actionResponseContent

return actionResponseContent
}

private fun compileTemplate(template: Script, ctx: TriggerExecutionContext): String {
return scriptService.compile(template, TemplateScript.CONTEXT)
.newInstance(template.params + mapOf("ctx" to ctx.asTemplateArg()))
.execute()
}

/**
* The "destination" ID referenced in a Monitor Action could either be a Notification config or a Destination config
* depending on whether the background migration process has already migrated it from a Destination to a Notification config.
*
* To cover both of these cases, the Notification config will take precedence and if it is not found, the Destination will be retrieved.
*/
private suspend fun getConfigForNotificationAction(action: Action): NotificationActionConfigs {
var destination: Destination? = null
val channel: NotificationConfigInfo? = getNotificationConfigInfo(client as NodeClient, action.destinationId)

// If the channel was not found, try to retrieve the Destination
if (channel == null) {
destination = try {
AlertingConfigAccessor.getDestinationInfo(client, xContentRegistry, action.destinationId)
} catch (e: IllegalStateException) {
// Catching the exception thrown when the Destination was not found so the NotificationActionConfigs object can be returned
null
}
}

return NotificationActionConfigs(destination, channel)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,13 @@

package org.opensearch.alerting.model.destination

import org.opensearch.alerting.opensearchapi.string
import org.opensearch.common.Strings
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.common.xcontent.XContentType
import java.io.IOException
import java.lang.IllegalStateException

Expand Down Expand Up @@ -71,12 +68,8 @@ data class Chime(val url: String) : ToXContent {
}
}

fun constructMessageContent(subject: String?, message: String?): String {
val messageContent: String? = if (Strings.isNullOrEmpty(subject)) message else "$subject \n\n $message"
val builder = XContentFactory.contentBuilder(XContentType.JSON)
builder.startObject()
.field("Content", messageContent)
.endObject()
return builder.string()
// Complete JSON structure is now constructed in the notification plugin
fun constructMessageContent(subject: String?, message: String): String {
return if (Strings.isNullOrEmpty(subject)) message else "$subject \n\n $message"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,27 @@
package org.opensearch.alerting.model.destination

import org.apache.logging.log4j.LogManager
import org.opensearch.alerting.destination.Notification
import org.opensearch.alerting.destination.message.BaseMessage
import org.opensearch.alerting.destination.message.ChimeMessage
import org.opensearch.alerting.destination.message.CustomWebhookMessage
import org.opensearch.alerting.destination.message.EmailMessage
import org.opensearch.alerting.destination.message.SlackMessage
import org.opensearch.alerting.model.destination.email.Email
import org.opensearch.alerting.opensearchapi.convertToMap
import org.opensearch.alerting.opensearchapi.instant
import org.opensearch.alerting.opensearchapi.optionalTimeField
import org.opensearch.alerting.opensearchapi.optionalUserField
import org.opensearch.alerting.util.DestinationType
import org.opensearch.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION
import org.opensearch.alerting.util.isHostInDenylist
import org.opensearch.alerting.util.destinationmigration.DestinationConversionUtils.Companion.convertAlertingToNotificationMethodType
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.commons.authuser.User
import org.opensearch.commons.destination.message.LegacyBaseMessage
import org.opensearch.commons.destination.message.LegacyChimeMessage
import org.opensearch.commons.destination.message.LegacyCustomWebhookMessage
import org.opensearch.commons.destination.message.LegacyEmailMessage
import org.opensearch.commons.destination.message.LegacySlackMessage
import java.io.IOException
import java.net.InetAddress
import java.time.Instant
import java.util.Locale

Expand Down Expand Up @@ -238,72 +236,52 @@ data class Destination(
}
}

@Throws(IOException::class)
fun publish(
fun buildLegacyBaseMessage(
compiledSubject: String?,
compiledMessage: String,
destinationCtx: DestinationContext,
denyHostRanges: List<String>
): String {
destinationCtx: DestinationContext
): LegacyBaseMessage {

val destinationMessage: BaseMessage
val responseContent: String
val responseStatusCode: Int
val destinationMessage: LegacyBaseMessage
when (type) {
DestinationType.CHIME -> {
val messageContent = chime?.constructMessageContent(compiledSubject, compiledMessage)
destinationMessage = ChimeMessage.Builder(name)
destinationMessage = LegacyChimeMessage.Builder(name)
.withUrl(chime?.url)
.withMessage(messageContent)
.build()
}
DestinationType.SLACK -> {
val messageContent = slack?.constructMessageContent(compiledSubject, compiledMessage)
destinationMessage = SlackMessage.Builder(name)
destinationMessage = LegacySlackMessage.Builder(name)
.withUrl(slack?.url)
.withMessage(messageContent)
.build()
}
DestinationType.CUSTOM_WEBHOOK -> {
destinationMessage = CustomWebhookMessage.Builder(name)
.withUrl(customWebhook?.url)
.withScheme(customWebhook?.scheme)
.withHost(customWebhook?.host)
.withPort(customWebhook?.port)
.withPath(customWebhook?.path)
.withMethod(customWebhook?.method)
.withQueryParams(customWebhook?.queryParams)
destinationMessage = LegacyCustomWebhookMessage.Builder(name)
.withUrl(getLegacyCustomWebhookMessageURL(customWebhook))
.withHeaderParams(customWebhook?.headerParams)
.withMessage(compiledMessage).build()
}
DestinationType.EMAIL -> {
val emailAccount = destinationCtx.emailAccount
destinationMessage = EmailMessage.Builder(name)
destinationMessage = LegacyEmailMessage.Builder(name)
.withAccountName(emailAccount?.name)
.withHost(emailAccount?.host)
.withPort(emailAccount?.port)
.withMethod(emailAccount?.method?.value)
.withUserName(emailAccount?.username)
.withPassword(emailAccount?.password)
.withMethod(emailAccount?.method?.let { convertAlertingToNotificationMethodType(it).toString() })
.withFrom(emailAccount?.email)
.withRecipients(destinationCtx.recipients)
.withSubject(compiledSubject)
.withMessage(compiledMessage).build()
}
DestinationType.TEST_ACTION -> {
return "test action"
}
else -> throw IllegalArgumentException("Unsupported Destination type [$type] for building legacy message")
}

validateDestinationUri(destinationMessage, denyHostRanges)
val response = Notification.publish(destinationMessage) as org.opensearch.alerting.destination.response.DestinationResponse
responseContent = response.responseContent
responseStatusCode = response.statusCode

logger.info("Message published for action name: $name, messageid: $responseContent, statuscode: $responseStatusCode")
return responseContent
return destinationMessage
}

fun constructResponseForDestinationType(type: DestinationType): Any {
private fun constructResponseForDestinationType(type: DestinationType): Any {
var content: Any? = null
when (type) {
DestinationType.CHIME -> content = chime?.convertToMap()?.get(type.value)
Expand All @@ -318,13 +296,14 @@ data class Destination(
return content
}

private fun validateDestinationUri(destinationMessage: BaseMessage, denyHostRanges: List<String>) {
if (destinationMessage.isHostInDenylist(denyHostRanges)) {
logger.error(
"Host: {} resolves to: {} which is in denylist: {}.", destinationMessage.uri.host,
InetAddress.getByName(destinationMessage.uri.host), denyHostRanges
)
throw IOException("The destination address is invalid.")
}
private fun getLegacyCustomWebhookMessageURL(customWebhook: CustomWebhook?): String {
return LegacyCustomWebhookMessage.Builder(name)
.withUrl(customWebhook?.url)
.withScheme(customWebhook?.scheme)
.withHost(customWebhook?.host)
.withPort(customWebhook?.port)
.withPath(customWebhook?.path)
.withQueryParams(customWebhook?.queryParams)
.build().uri.toString()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,13 @@

package org.opensearch.alerting.model.destination

import org.opensearch.alerting.opensearchapi.string
import org.opensearch.common.Strings
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken
import org.opensearch.common.xcontent.XContentType
import java.io.IOException
import java.lang.IllegalStateException

Expand Down Expand Up @@ -71,12 +68,8 @@ data class Slack(val url: String) : ToXContent {
}
}

// Complete JSON structure is now constructed in the notification plugin
fun constructMessageContent(subject: String?, message: String): String {
val messageContent: String? = if (Strings.isNullOrEmpty(subject)) message else "$subject \n\n $message"
val builder = XContentFactory.contentBuilder(XContentType.JSON)
builder.startObject()
.field("text", messageContent)
.endObject()
return builder.string()
return if (Strings.isNullOrEmpty(subject)) message else "$subject \n\n $message"
}
}
Loading