Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.5] Manual backports of several PRs #957

Merged
merged 9 commits into from
Jun 8, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
.registerClusterService(clusterService)
.registerClient(client)
.registerNamedXContentRegistry(xContentRegistry)
.registerindexNameExpressionResolver(indexNameExpressionResolver)
.registerScriptService(scriptService)
.registerSettings(settings)
.registerThreadPool(threadPool)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ package org.opensearch.alerting
import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.admin.indices.get.GetIndexRequest
import org.opensearch.action.admin.indices.get.GetIndexResponse
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.index.IndexResponse
import org.opensearch.action.search.SearchAction
Expand All @@ -24,9 +22,11 @@ import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.client.Client
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.routing.ShardRouting
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.bytes.BytesReference
Expand Down Expand Up @@ -54,6 +54,7 @@ import org.opensearch.search.sort.SortOrder
import java.io.IOException
import java.time.Instant
import java.util.UUID
import kotlin.collections.HashMap
import kotlin.math.max

object DocumentLevelMonitorRunner : MonitorRunner() {
Expand Down Expand Up @@ -88,13 +89,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}

var (monitorMetadata, _) = MonitorMetadataService.getOrCreateMetadata(
monitor = monitor,
monitor,
createWithRunContext = false,
skipIndex = isTempMonitor
)

val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput
val index = docLevelMonitorInput.indices[0]

val queries: List<DocLevelQuery> = docLevelMonitorInput.queries

val lastRunContext = if (monitorMetadata.lastRunContext.isNullOrEmpty()) mutableMapOf()
Expand All @@ -107,6 +108,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
val docsToQueries = mutableMapOf<String, MutableList<String>>()

try {
// Resolve all passed indices to concrete indices
val indices = IndexUtils.resolveAllIndices(
docLevelMonitorInput.indices,
monitorCtx.clusterService!!,
monitorCtx.indexNameExpressionResolver!!
)

monitorCtx.docLevelMonitorQueries!!.initDocLevelQueryIndex(monitor.dataSources)
monitorCtx.docLevelMonitorQueries!!.indexDocLevelQueries(
monitor = monitor,
Expand All @@ -115,12 +123,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
indexTimeout = monitorCtx.indexTimeout!!
)

val getIndexRequest = GetIndexRequest().indices(index)
val getIndexResponse: GetIndexResponse = monitorCtx.client!!.suspendUntil {
monitorCtx.client!!.admin().indices().getIndex(getIndexRequest, it)
}
val indices = getIndexResponse.indices()

// cleanup old indices that are not monitored anymore from the same monitor
for (ind in updatedLastRunContext.keys) {
if (!indices.contains(ind)) {
Expand All @@ -131,8 +133,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
indices.forEach { indexName ->
// Prepare lastRunContext for each index
val indexLastRunContext = lastRunContext.getOrPut(indexName) {
val indexCreatedRecently = createdRecently(monitor, indexName, periodStart, periodEnd, getIndexResponse)
MonitorMetadataService.createRunContextForIndex(indexName, indexCreatedRecently)
val isIndexCreatedRecently = createdRecently(
monitor,
periodStart,
periodEnd,
monitorCtx.clusterService!!.state().metadata.index(indexName)
)
MonitorMetadataService.createRunContextForIndex(indexName, isIndexCreatedRecently)
}

// Prepare updatedLastRunContext for each index
Expand Down Expand Up @@ -385,9 +392,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
throw IOException("Invalid input with document-level-monitor.")
}

val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput
if (docLevelMonitorInput.indices.size > 1) {
throw IOException("Only one index is supported with document-level-monitor.")
if ((monitor.inputs[0] as DocLevelMonitorInput).indices.isEmpty()) {
throw IllegalArgumentException("DocLevelMonitorInput has no indices")
}
}

Expand All @@ -414,13 +420,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
// new index is monitored from the beginning of that index
private fun createdRecently(
monitor: Monitor,
index: String,
periodStart: Instant,
periodEnd: Instant,
getIndexResponse: GetIndexResponse
indexMetadata: IndexMetadata
): Boolean {
val lastExecutionTime = if (periodStart == periodEnd) monitor.lastUpdateTime else periodStart
return getIndexResponse.settings.get(index).getAsLong("index.creation_date", 0L) > lastExecutionTime.toEpochMilli()
val indexCreationDate = indexMetadata.settings.get("index.creation_date")?.toLong() ?: 0L
return indexCreationDate > lastExecutionTime.toEpochMilli()
}

/**
Expand Down
28 changes: 18 additions & 10 deletions alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,24 @@ abstract class MonitorRunner {
throw IllegalStateException("Message content missing in the Destination with id: ${action.destinationId}")
}
if (!dryrun) {
val roles = MonitorRunnerService.getRolesForMonitor(monitor)
withClosableContext(
InjectorContextElement(monitor.id, monitorCtx.settings!!, monitorCtx.threadPool!!.threadContext, roles)
) {
actionOutput[Action.MESSAGE_ID] = getConfigAndSendNotification(
action,
monitorCtx,
actionOutput[Action.SUBJECT],
actionOutput[Action.MESSAGE]!!
)
val client = monitorCtx.client
client!!.threadPool().threadContext.stashContext().use {
withClosableContext(
InjectorContextElement(
monitor.id,
monitorCtx.settings!!,
monitorCtx.threadPool!!.threadContext,
monitor.user?.roles,
monitor.user
)
) {
actionOutput[Action.MESSAGE_ID] = getConfigAndSendNotification(
action,
monitorCtx,
actionOutput[Action.SUBJECT],
actionOutput[Action.MESSAGE]!!
)
}
}
}
ActionRunResult(action.id, action.name, actionOutput, false, MonitorRunnerService.currentTime(), null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.opensearch.alerting.settings.DestinationSettings
import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings
import org.opensearch.alerting.util.DocLevelMonitorQueries
import org.opensearch.client.Client
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
Expand All @@ -25,6 +26,7 @@ data class MonitorRunnerExecutionContext(
var clusterService: ClusterService? = null,
var client: Client? = null,
var xContentRegistry: NamedXContentRegistry? = null,
var indexNameExpressionResolver: IndexNameExpressionResolver? = null,
var scriptService: ScriptService? = null,
var settings: Settings? = null,
var threadPool: ThreadPool? = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.opensearch.alerting.util.DocLevelMonitorQueries
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.isDocLevelMonitor
import org.opensearch.client.Client
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.component.AbstractLifecycleComponent
import org.opensearch.common.settings.Settings
Expand Down Expand Up @@ -75,6 +76,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
return this
}

fun registerindexNameExpressionResolver(indexNameExpressionResolver: IndexNameExpressionResolver): MonitorRunnerService {
this.monitorCtx.indexNameExpressionResolver = indexNameExpressionResolver
return this
}

fun registerScriptService(scriptService: ScriptService): MonitorRunnerService {
this.monitorCtx.scriptService = scriptService
return this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.alerting.alerts

import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.ResourceAlreadyExistsException
import org.opensearch.action.ActionListener
import org.opensearch.action.admin.cluster.state.ClusterStateRequest
Expand Down Expand Up @@ -36,6 +37,7 @@ import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDING_HISTO
import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDING_HISTORY_RETENTION_PERIOD
import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDING_HISTORY_ROLLOVER_PERIOD
import org.opensearch.alerting.settings.AlertingSettings.Companion.REQUEST_TIMEOUT
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.client.Client
import org.opensearch.cluster.ClusterChangedEvent
Expand Down Expand Up @@ -357,8 +359,12 @@ class AlertIndices(
return try {
val createIndexResponse: CreateIndexResponse = client.admin().indices().suspendUntil { create(request, it) }
createIndexResponse.isAcknowledged
} catch (e: ResourceAlreadyExistsException) {
true
} catch (t: Exception) {
if (ExceptionsHelper.unwrapCause(t) is ResourceAlreadyExistsException) {
true
} else {
throw AlertingException.wrap(t)
}
}
}

Expand Down
Loading