Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor MonitorRunner #143

Merged
merged 1 commit into from
Aug 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 202 additions & 0 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.alerting

import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.action.DocWriteRequest
import org.opensearch.action.bulk.BackoffPolicy
import org.opensearch.action.bulk.BulkRequest
import org.opensearch.action.bulk.BulkResponse
import org.opensearch.action.delete.DeleteRequest
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.alerting.alerts.AlertError
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.elasticapi.firstFailureOrNull
import org.opensearch.alerting.elasticapi.retry
import org.opensearch.alerting.elasticapi.suspendUntil
import org.opensearch.alerting.model.ActionExecutionResult
import org.opensearch.alerting.model.Alert
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.model.Trigger
import org.opensearch.alerting.model.TriggerRunResult
import org.opensearch.alerting.script.TriggerExecutionContext
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.client.Client
import org.opensearch.common.bytes.BytesReference
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParserUtils
import org.opensearch.common.xcontent.XContentType
import org.opensearch.index.query.QueryBuilders
import org.opensearch.rest.RestStatus
import org.opensearch.search.builder.SearchSourceBuilder
import java.time.Instant

/** Service that handles CRUD operations for alerts */
class AlertService(
val client: Client,
val xContentRegistry: NamedXContentRegistry,
val alertIndices: AlertIndices
) {

private val logger = LogManager.getLogger(AlertService::class.java)

suspend fun loadCurrentAlerts(monitor: Monitor): Map<Trigger, Alert?> {
val request = SearchRequest(AlertIndices.ALERT_INDEX)
.routing(monitor.id)
.source(alertQuery(monitor))
val response: SearchResponse = client.suspendUntil { client.search(request, it) }
if (response.status() != RestStatus.OK) {
throw (response.firstFailureOrNull()?.cause ?: RuntimeException("Unknown error loading alerts"))
}

val foundAlerts = response.hits.map { Alert.parse(contentParser(it.sourceRef), it.id, it.version) }
.groupBy { it.triggerId }
foundAlerts.values.forEach { alerts ->
if (alerts.size > 1) {
logger.warn("Found multiple alerts for same trigger: $alerts")
}
}

return monitor.triggers.associate { trigger ->
trigger to (foundAlerts[trigger.id]?.firstOrNull())
}
}

fun composeAlert(ctx: TriggerExecutionContext, result: TriggerRunResult, alertError: AlertError?): Alert? {
val currentTime = Instant.now()
val currentAlert = ctx.alert

val updatedActionExecutionResults = mutableListOf<ActionExecutionResult>()
val currentActionIds = mutableSetOf<String>()
if (currentAlert != null) {
// update current alert's action execution results
for (actionExecutionResult in currentAlert.actionExecutionResults) {
val actionId = actionExecutionResult.actionId
currentActionIds.add(actionId)
val actionRunResult = result.actionResults[actionId]
when {
actionRunResult == null -> updatedActionExecutionResults.add(actionExecutionResult)
actionRunResult.throttled ->
updatedActionExecutionResults.add(actionExecutionResult.copy(
throttledCount = actionExecutionResult.throttledCount + 1))
else -> updatedActionExecutionResults.add(actionExecutionResult.copy(lastExecutionTime = actionRunResult.executionTime))
}
}
// add action execution results which not exist in current alert
updatedActionExecutionResults.addAll(result.actionResults.filter { it -> !currentActionIds.contains(it.key) }
.map { it -> ActionExecutionResult(it.key, it.value.executionTime, if (it.value.throttled) 1 else 0) })
} else {
updatedActionExecutionResults.addAll(result.actionResults.map { it -> ActionExecutionResult(it.key, it.value.executionTime,
if (it.value.throttled) 1 else 0) })
}

// Merge the alert's error message to the current alert's history
val updatedHistory = currentAlert?.errorHistory.update(alertError)
return if (alertError == null && !result.triggered) {
currentAlert?.copy(state = Alert.State.COMPLETED, endTime = currentTime, errorMessage = null,
errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion)
} else if (alertError == null && currentAlert?.isAcknowledged() == true) {
null
} else if (currentAlert != null) {
val alertState = if (alertError == null) Alert.State.ACTIVE else Alert.State.ERROR
currentAlert.copy(state = alertState, lastNotificationTime = currentTime, errorMessage = alertError?.message,
errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion)
} else {
val alertState = if (alertError == null) Alert.State.ACTIVE else Alert.State.ERROR
Alert(monitor = ctx.monitor, trigger = ctx.trigger, startTime = currentTime,
lastNotificationTime = currentTime, state = alertState, errorMessage = alertError?.message,
errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion)
}
}

suspend fun saveAlerts(alerts: List<Alert>, retryPolicy: BackoffPolicy) {
var requestsToRetry = alerts.flatMap { alert ->
// We don't want to set the version when saving alerts because the MonitorRunner has first priority when writing alerts.
// In the rare event that a user acknowledges an alert between when it's read and when it's written
// back we're ok if that acknowledgement is lost. It's easier to get the user to retry than for the runner to
// spend time reloading the alert and writing it back.
when (alert.state) {
Alert.State.ACTIVE, Alert.State.ERROR -> {
listOf<DocWriteRequest<*>>(IndexRequest(AlertIndices.ALERT_INDEX)
.routing(alert.monitorId)
.source(alert.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.id(if (alert.id != Alert.NO_ID) alert.id else null))
}
Alert.State.ACKNOWLEDGED, Alert.State.DELETED -> {
throw IllegalStateException("Unexpected attempt to save ${alert.state} alert: $alert")
}
Alert.State.COMPLETED -> {
listOfNotNull<DocWriteRequest<*>>(
DeleteRequest(AlertIndices.ALERT_INDEX, alert.id)
.routing(alert.monitorId),
// Only add completed alert to history index if history is enabled
if (alertIndices.isHistoryEnabled()) {
IndexRequest(AlertIndices.HISTORY_WRITE_INDEX)
.routing(alert.monitorId)
.source(alert.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS))
.id(alert.id)
} else null
)
}
}
}

if (requestsToRetry.isEmpty()) return
// Retry Bulk requests if there was any 429 response
retryPolicy.retry(logger, listOf(RestStatus.TOO_MANY_REQUESTS)) {
val bulkRequest = BulkRequest().add(requestsToRetry)
val bulkResponse: BulkResponse = client.suspendUntil { client.bulk(bulkRequest, it) }
val failedResponses = (bulkResponse.items ?: arrayOf()).filter { it.isFailed }
requestsToRetry = failedResponses.filter { it.status() == RestStatus.TOO_MANY_REQUESTS }
.map { bulkRequest.requests()[it.itemId] as IndexRequest }

if (requestsToRetry.isNotEmpty()) {
val retryCause = failedResponses.first { it.status() == RestStatus.TOO_MANY_REQUESTS }.failure.cause
throw ExceptionsHelper.convertToOpenSearchException(retryCause)
}
}
}

private fun contentParser(bytesReference: BytesReference): XContentParser {
val xcp = XContentHelper.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE,
bytesReference, XContentType.JSON)
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp)
return xcp
}

private fun alertQuery(monitor: Monitor): SearchSourceBuilder {
return SearchSourceBuilder.searchSource()
.size(monitor.triggers.size * 2) // We expect there to be only a single in-progress alert so fetch 2 to check
.query(QueryBuilders.termQuery(Alert.MONITOR_ID_FIELD, monitor.id))
}

private fun List<AlertError>?.update(alertError: AlertError?): List<AlertError> {
return when {
this == null && alertError == null -> emptyList()
this != null && alertError == null -> this
this == null && alertError != null -> listOf(alertError)
this != null && alertError != null -> (listOf(alertError) + this).take(10)
else -> throw IllegalStateException("Unreachable code reached!")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,19 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
// Need to figure out how to use the OpenSearch DI classes rather than handwiring things here.
val settings = environment.settings()
alertIndices = AlertIndices(settings, client, threadPool, clusterService)
runner = MonitorRunner(settings, client, threadPool, scriptService, xContentRegistry, alertIndices, clusterService)
runner = MonitorRunner
.registerClusterService(clusterService)
.registerClient(client)
.registerNamedXContentRegistry(xContentRegistry)
.registerScriptService(scriptService)
.registerSettings(settings)
.registerThreadPool(threadPool)
.registerAlertIndices(alertIndices)
.registerInputService(InputService(client, scriptService, xContentRegistry))
.registerTriggerService(TriggerService(client, scriptService))
.registerAlertService(AlertService(client, xContentRegistry, alertIndices))
.registerConsumers()
.registerDestinationSettings()
scheduledJobIndices = ScheduledJobIndices(client.admin(), clusterService)
scheduler = JobScheduler(threadPool, runner)
sweeper = JobSweeper(environment.settings(), client, clusterService, threadPool, xContentRegistry, scheduler, ALERTING_JOB_TYPES)
Expand Down
130 changes: 130 additions & 0 deletions alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.alerting

import org.apache.logging.log4j.LogManager
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.alerting.core.model.SearchInput
import org.opensearch.alerting.elasticapi.convertToMap
import org.opensearch.alerting.elasticapi.suspendUntil
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.util.addUserBackendRolesFilter
import org.opensearch.client.Client
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.common.xcontent.XContentType
import org.opensearch.script.Script
import org.opensearch.script.ScriptService
import org.opensearch.script.ScriptType
import org.opensearch.script.TemplateScript
import org.opensearch.search.builder.SearchSourceBuilder
import java.time.Instant

/** Service that handles the collection of input results for Monitor executions */
class InputService(
val client: Client,
val scriptService: ScriptService,
val xContentRegistry: NamedXContentRegistry
) {

private val logger = LogManager.getLogger(InputService::class.java)

suspend fun collectInputResults(monitor: Monitor, periodStart: Instant, periodEnd: Instant): InputRunResults {
return try {
val results = mutableListOf<Map<String, Any>>()
monitor.inputs.forEach { input ->
when (input) {
is SearchInput -> {
// TODO: Figure out a way to use SearchTemplateRequest without bringing in the entire TransportClient
val searchParams = mapOf("period_start" to periodStart.toEpochMilli(),
"period_end" to periodEnd.toEpochMilli())
val searchSource = scriptService.compile(Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG,
input.query.toString(), searchParams), TemplateScript.CONTEXT)
.newInstance(searchParams)
.execute()

val searchRequest = SearchRequest().indices(*input.indices.toTypedArray())
XContentType.JSON.xContent().createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, searchSource).use {
searchRequest.source(SearchSourceBuilder.fromXContent(it))
}
val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) }
results += searchResponse.convertToMap()
}
else -> {
throw IllegalArgumentException("Unsupported input type: ${input.name()}.")
}
}
}
InputRunResults(results.toList())
} catch (e: Exception) {
logger.info("Error collecting inputs for monitor: ${monitor.id}", e)
InputRunResults(emptyList(), e)
}
}

/**
* We moved anomaly result index to system index list. So common user could not directly query
* this index any more. This method will stash current thread context to pass security check.
* So monitor job can access anomaly result index. We will add monitor user roles filter in
* search query to only return documents the monitor user can access.
*
* On alerting Kibana, monitor users can only see detectors that they have read access. So they
* can't create monitor on other user's detector which they have no read access. Even they know
* other user's detector id and use it to create monitor, this method will only return anomaly
* results they can read.
*/
suspend fun collectInputResultsForADMonitor(monitor: Monitor, periodStart: Instant, periodEnd: Instant): InputRunResults {
return try {
val results = mutableListOf<Map<String, Any>>()
val input = monitor.inputs[0] as SearchInput

val searchParams = mapOf("period_start" to periodStart.toEpochMilli(), "period_end" to periodEnd.toEpochMilli())
val searchSource = scriptService.compile(Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG,
input.query.toString(), searchParams), TemplateScript.CONTEXT)
.newInstance(searchParams)
.execute()

val searchRequest = SearchRequest().indices(*input.indices.toTypedArray())
XContentType.JSON.xContent().createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, searchSource).use {
searchRequest.source(SearchSourceBuilder.fromXContent(it))
}

// Add user role filter for AD result
client.threadPool().threadContext.stashContext().use {
// Currently we have no way to verify if user has AD read permission or not. So we always add user
// role filter here no matter AD backend role filter enabled or not. If we don't add user role filter
// when AD backend filter disabled, user can run monitor on any detector and get anomaly data even
// they have no AD read permission. So if domain disabled AD backend role filter, monitor runner
// still can't get AD result with different user backend role, even the monitor user has permission
// to read AD result. This is a short term solution to trade off between user experience and security.
//
// Possible long term solution:
// 1.Use secure rest client to send request to AD search result API. If no permission exception,
// that mean user has read access on AD result. Then don't need to add user role filter when query
// AD result if AD backend role filter is disabled.
// 2.Security provide some transport action to verify if user has permission to search AD result.
// Monitor runner will send transport request to check permission first. If security plugin response
// is yes, user has permission to query AD result. If AD role filter enabled, we will add user role
// filter to protect data at user role level; otherwise, user can query any AD result.
addUserBackendRolesFilter(monitor.user, searchRequest.source())
val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) }
results += searchResponse.convertToMap()
}
InputRunResults(results.toList())
} catch (e: Exception) {
logger.info("Error collecting anomaly result inputs for monitor: ${monitor.id}", e)
InputRunResults(emptyList(), e)
}
}
}
Loading