Skip to content

Commit

Permalink
Refactor MonitorRunner (#143)
Browse files Browse the repository at this point in the history
Signed-off-by: Mohammad Qureshi <[email protected]>
  • Loading branch information
qreshi authored Aug 20, 2021
1 parent 751aabd commit 352d272
Show file tree
Hide file tree
Showing 7 changed files with 501 additions and 318 deletions.
202 changes: 202 additions & 0 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.alerting

import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.action.DocWriteRequest
import org.opensearch.action.bulk.BackoffPolicy
import org.opensearch.action.bulk.BulkRequest
import org.opensearch.action.bulk.BulkResponse
import org.opensearch.action.delete.DeleteRequest
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.alerting.alerts.AlertError
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.elasticapi.firstFailureOrNull
import org.opensearch.alerting.elasticapi.retry
import org.opensearch.alerting.elasticapi.suspendUntil
import org.opensearch.alerting.model.ActionExecutionResult
import org.opensearch.alerting.model.Alert
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.model.Trigger
import org.opensearch.alerting.model.TriggerRunResult
import org.opensearch.alerting.script.TriggerExecutionContext
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.client.Client
import org.opensearch.common.bytes.BytesReference
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParserUtils
import org.opensearch.common.xcontent.XContentType
import org.opensearch.index.query.QueryBuilders
import org.opensearch.rest.RestStatus
import org.opensearch.search.builder.SearchSourceBuilder
import java.time.Instant

/** Service that handles CRUD operations for alerts */
class AlertService(
val client: Client,
val xContentRegistry: NamedXContentRegistry,
val alertIndices: AlertIndices
) {

private val logger = LogManager.getLogger(AlertService::class.java)

suspend fun loadCurrentAlerts(monitor: Monitor): Map<Trigger, Alert?> {
val request = SearchRequest(AlertIndices.ALERT_INDEX)
.routing(monitor.id)
.source(alertQuery(monitor))
val response: SearchResponse = client.suspendUntil { client.search(request, it) }
if (response.status() != RestStatus.OK) {
throw (response.firstFailureOrNull()?.cause ?: RuntimeException("Unknown error loading alerts"))
}

val foundAlerts = response.hits.map { Alert.parse(contentParser(it.sourceRef), it.id, it.version) }
.groupBy { it.triggerId }
foundAlerts.values.forEach { alerts ->
if (alerts.size > 1) {
logger.warn("Found multiple alerts for same trigger: $alerts")
}
}

return monitor.triggers.associate { trigger ->
trigger to (foundAlerts[trigger.id]?.firstOrNull())
}
}

fun composeAlert(ctx: TriggerExecutionContext, result: TriggerRunResult, alertError: AlertError?): Alert? {
val currentTime = Instant.now()
val currentAlert = ctx.alert

val updatedActionExecutionResults = mutableListOf<ActionExecutionResult>()
val currentActionIds = mutableSetOf<String>()
if (currentAlert != null) {
// update current alert's action execution results
for (actionExecutionResult in currentAlert.actionExecutionResults) {
val actionId = actionExecutionResult.actionId
currentActionIds.add(actionId)
val actionRunResult = result.actionResults[actionId]
when {
actionRunResult == null -> updatedActionExecutionResults.add(actionExecutionResult)
actionRunResult.throttled ->
updatedActionExecutionResults.add(actionExecutionResult.copy(
throttledCount = actionExecutionResult.throttledCount + 1))
else -> updatedActionExecutionResults.add(actionExecutionResult.copy(lastExecutionTime = actionRunResult.executionTime))
}
}
// add action execution results which not exist in current alert
updatedActionExecutionResults.addAll(result.actionResults.filter { it -> !currentActionIds.contains(it.key) }
.map { it -> ActionExecutionResult(it.key, it.value.executionTime, if (it.value.throttled) 1 else 0) })
} else {
updatedActionExecutionResults.addAll(result.actionResults.map { it -> ActionExecutionResult(it.key, it.value.executionTime,
if (it.value.throttled) 1 else 0) })
}

// Merge the alert's error message to the current alert's history
val updatedHistory = currentAlert?.errorHistory.update(alertError)
return if (alertError == null && !result.triggered) {
currentAlert?.copy(state = Alert.State.COMPLETED, endTime = currentTime, errorMessage = null,
errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion)
} else if (alertError == null && currentAlert?.isAcknowledged() == true) {
null
} else if (currentAlert != null) {
val alertState = if (alertError == null) Alert.State.ACTIVE else Alert.State.ERROR
currentAlert.copy(state = alertState, lastNotificationTime = currentTime, errorMessage = alertError?.message,
errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion)
} else {
val alertState = if (alertError == null) Alert.State.ACTIVE else Alert.State.ERROR
Alert(monitor = ctx.monitor, trigger = ctx.trigger, startTime = currentTime,
lastNotificationTime = currentTime, state = alertState, errorMessage = alertError?.message,
errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion)
}
}

suspend fun saveAlerts(alerts: List<Alert>, retryPolicy: BackoffPolicy) {
var requestsToRetry = alerts.flatMap { alert ->
// We don't want to set the version when saving alerts because the MonitorRunner has first priority when writing alerts.
// In the rare event that a user acknowledges an alert between when it's read and when it's written
// back we're ok if that acknowledgement is lost. It's easier to get the user to retry than for the runner to
// spend time reloading the alert and writing it back.
when (alert.state) {
Alert.State.ACTIVE, Alert.State.ERROR -> {
listOf<DocWriteRequest<*>>(IndexRequest(AlertIndices.ALERT_INDEX)
.routing(alert.monitorId)
.source(alert.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.id(if (alert.id != Alert.NO_ID) alert.id else null))
}
Alert.State.ACKNOWLEDGED, Alert.State.DELETED -> {
throw IllegalStateException("Unexpected attempt to save ${alert.state} alert: $alert")
}
Alert.State.COMPLETED -> {
listOfNotNull<DocWriteRequest<*>>(
DeleteRequest(AlertIndices.ALERT_INDEX, alert.id)
.routing(alert.monitorId),
// Only add completed alert to history index if history is enabled
if (alertIndices.isHistoryEnabled()) {
IndexRequest(AlertIndices.HISTORY_WRITE_INDEX)
.routing(alert.monitorId)
.source(alert.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.id(alert.id)
} else null
)
}
}
}

if (requestsToRetry.isEmpty()) return
// Retry Bulk requests if there was any 429 response
retryPolicy.retry(logger, listOf(RestStatus.TOO_MANY_REQUESTS)) {
val bulkRequest = BulkRequest().add(requestsToRetry)
val bulkResponse: BulkResponse = client.suspendUntil { client.bulk(bulkRequest, it) }
val failedResponses = (bulkResponse.items ?: arrayOf()).filter { it.isFailed }
requestsToRetry = failedResponses.filter { it.status() == RestStatus.TOO_MANY_REQUESTS }
.map { bulkRequest.requests()[it.itemId] as IndexRequest }

if (requestsToRetry.isNotEmpty()) {
val retryCause = failedResponses.first { it.status() == RestStatus.TOO_MANY_REQUESTS }.failure.cause
throw ExceptionsHelper.convertToOpenSearchException(retryCause)
}
}
}

private fun contentParser(bytesReference: BytesReference): XContentParser {
val xcp = XContentHelper.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE,
bytesReference, XContentType.JSON)
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp)
return xcp
}

private fun alertQuery(monitor: Monitor): SearchSourceBuilder {
return SearchSourceBuilder.searchSource()
.size(monitor.triggers.size * 2) // We expect there to be only a single in-progress alert so fetch 2 to check
.query(QueryBuilders.termQuery(Alert.MONITOR_ID_FIELD, monitor.id))
}

private fun List<AlertError>?.update(alertError: AlertError?): List<AlertError> {
return when {
this == null && alertError == null -> emptyList()
this != null && alertError == null -> this
this == null && alertError != null -> listOf(alertError)
this != null && alertError != null -> (listOf(alertError) + this).take(10)
else -> throw IllegalStateException("Unreachable code reached!")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,19 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
// Need to figure out how to use the OpenSearch DI classes rather than handwiring things here.
val settings = environment.settings()
alertIndices = AlertIndices(settings, client, threadPool, clusterService)
runner = MonitorRunner(settings, client, threadPool, scriptService, xContentRegistry, alertIndices, clusterService)
runner = MonitorRunner
.registerClusterService(clusterService)
.registerClient(client)
.registerNamedXContentRegistry(xContentRegistry)
.registerScriptService(scriptService)
.registerSettings(settings)
.registerThreadPool(threadPool)
.registerAlertIndices(alertIndices)
.registerInputService(InputService(client, scriptService, xContentRegistry))
.registerTriggerService(TriggerService(client, scriptService))
.registerAlertService(AlertService(client, xContentRegistry, alertIndices))
.registerConsumers()
.registerDestinationSettings()
scheduledJobIndices = ScheduledJobIndices(client.admin(), clusterService)
scheduler = JobScheduler(threadPool, runner)
sweeper = JobSweeper(environment.settings(), client, clusterService, threadPool, xContentRegistry, scheduler, ALERTING_JOB_TYPES)
Expand Down
130 changes: 130 additions & 0 deletions alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.alerting

import org.apache.logging.log4j.LogManager
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.alerting.core.model.SearchInput
import org.opensearch.alerting.elasticapi.convertToMap
import org.opensearch.alerting.elasticapi.suspendUntil
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.util.addUserBackendRolesFilter
import org.opensearch.client.Client
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.common.xcontent.XContentType
import org.opensearch.script.Script
import org.opensearch.script.ScriptService
import org.opensearch.script.ScriptType
import org.opensearch.script.TemplateScript
import org.opensearch.search.builder.SearchSourceBuilder
import java.time.Instant

/** Service that handles the collection of input results for Monitor executions */
class InputService(
val client: Client,
val scriptService: ScriptService,
val xContentRegistry: NamedXContentRegistry
) {

private val logger = LogManager.getLogger(InputService::class.java)

suspend fun collectInputResults(monitor: Monitor, periodStart: Instant, periodEnd: Instant): InputRunResults {
return try {
val results = mutableListOf<Map<String, Any>>()
monitor.inputs.forEach { input ->
when (input) {
is SearchInput -> {
// TODO: Figure out a way to use SearchTemplateRequest without bringing in the entire TransportClient
val searchParams = mapOf("period_start" to periodStart.toEpochMilli(),
"period_end" to periodEnd.toEpochMilli())
val searchSource = scriptService.compile(Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG,
input.query.toString(), searchParams), TemplateScript.CONTEXT)
.newInstance(searchParams)
.execute()

val searchRequest = SearchRequest().indices(*input.indices.toTypedArray())
XContentType.JSON.xContent().createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, searchSource).use {
searchRequest.source(SearchSourceBuilder.fromXContent(it))
}
val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) }
results += searchResponse.convertToMap()
}
else -> {
throw IllegalArgumentException("Unsupported input type: ${input.name()}.")
}
}
}
InputRunResults(results.toList())
} catch (e: Exception) {
logger.info("Error collecting inputs for monitor: ${monitor.id}", e)
InputRunResults(emptyList(), e)
}
}

/**
* We moved anomaly result index to system index list. So common user could not directly query
* this index any more. This method will stash current thread context to pass security check.
* So monitor job can access anomaly result index. We will add monitor user roles filter in
* search query to only return documents the monitor user can access.
*
* On alerting Kibana, monitor users can only see detectors that they have read access. So they
* can't create monitor on other user's detector which they have no read access. Even they know
* other user's detector id and use it to create monitor, this method will only return anomaly
* results they can read.
*/
suspend fun collectInputResultsForADMonitor(monitor: Monitor, periodStart: Instant, periodEnd: Instant): InputRunResults {
return try {
val results = mutableListOf<Map<String, Any>>()
val input = monitor.inputs[0] as SearchInput

val searchParams = mapOf("period_start" to periodStart.toEpochMilli(), "period_end" to periodEnd.toEpochMilli())
val searchSource = scriptService.compile(Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG,
input.query.toString(), searchParams), TemplateScript.CONTEXT)
.newInstance(searchParams)
.execute()

val searchRequest = SearchRequest().indices(*input.indices.toTypedArray())
XContentType.JSON.xContent().createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, searchSource).use {
searchRequest.source(SearchSourceBuilder.fromXContent(it))
}

// Add user role filter for AD result
client.threadPool().threadContext.stashContext().use {
// Currently we have no way to verify if user has AD read permission or not. So we always add user
// role filter here no matter AD backend role filter enabled or not. If we don't add user role filter
// when AD backend filter disabled, user can run monitor on any detector and get anomaly data even
// they have no AD read permission. So if domain disabled AD backend role filter, monitor runner
// still can't get AD result with different user backend role, even the monitor user has permission
// to read AD result. This is a short term solution to trade off between user experience and security.
//
// Possible long term solution:
// 1.Use secure rest client to send request to AD search result API. If no permission exception,
// that mean user has read access on AD result. Then don't need to add user role filter when query
// AD result if AD backend role filter is disabled.
// 2.Security provide some transport action to verify if user has permission to search AD result.
// Monitor runner will send transport request to check permission first. If security plugin response
// is yes, user has permission to query AD result. If AD role filter enabled, we will add user role
// filter to protect data at user role level; otherwise, user can query any AD result.
addUserBackendRolesFilter(monitor.user, searchRequest.source())
val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) }
results += searchResponse.convertToMap()
}
InputRunResults(results.toList())
} catch (e: Exception) {
logger.info("Error collecting anomaly result inputs for monitor: ${monitor.id}", e)
InputRunResults(emptyList(), e)
}
}
}
Loading

0 comments on commit 352d272

Please sign in to comment.