Skip to content

Commit

Permalink
[Backport 2.x] QueryIndex rollover when field mapping limit is reached (
Browse files Browse the repository at this point in the history
opensearch-project#729)

Signed-off-by: Petar Dzepina <[email protected]>
  • Loading branch information
petardz authored and lezzago committed Jun 7, 2023
1 parent ea34a55 commit eb9951f
Show file tree
Hide file tree
Showing 16 changed files with 674 additions and 211 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,14 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
destinationMigrationCoordinator = DestinationMigrationCoordinator(client, clusterService, threadPool, scheduledJobIndices)
this.threadPool = threadPool
this.clusterService = clusterService

MonitorMetadataService.initialize(
client,
clusterService,
xContentRegistry,
settings
)

return listOf(sweeper, scheduler, runner, scheduledJobIndices, docLevelMonitorQueries, destinationMigrationCoordinator)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package org.opensearch.alerting

import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.admin.indices.get.GetIndexRequest
import org.opensearch.action.admin.indices.get.GetIndexResponse
import org.opensearch.action.index.IndexRequest
Expand All @@ -21,13 +22,13 @@ import org.opensearch.alerting.core.model.DocLevelQuery
import org.opensearch.alerting.core.model.ScheduledJob
import org.opensearch.alerting.model.ActionExecutionResult
import org.opensearch.alerting.model.Alert
import org.opensearch.alerting.model.AlertingConfigAccessor.Companion.getMonitorMetadata
import org.opensearch.alerting.model.DocumentExecutionContext
import org.opensearch.alerting.model.DocumentLevelTrigger
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.alerting.model.Finding
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.model.MonitorMetadata
import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.model.action.PerAlertActionScope
import org.opensearch.alerting.opensearchapi.string
Expand All @@ -36,7 +37,6 @@ import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.util.updateMonitorMetadata
import org.opensearch.client.Client
import org.opensearch.cluster.routing.ShardRouting
import org.opensearch.cluster.service.ClusterService
Expand All @@ -55,7 +55,6 @@ import org.opensearch.search.sort.SortOrder
import java.io.IOException
import java.time.Instant
import java.util.UUID
import kotlin.collections.HashMap
import kotlin.math.max

object DocumentLevelMonitorRunner : MonitorRunner() {
Expand Down Expand Up @@ -88,22 +87,20 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
return monitorResult.copy(error = AlertingException.wrap(e))
}

var (monitorMetadata, _) = MonitorMetadataService.getOrCreateMetadata(monitor, createWithRunContext = false)

monitorCtx.docLevelMonitorQueries!!.initDocLevelQueryIndex()
monitorCtx.docLevelMonitorQueries!!.indexDocLevelQueries(
monitor = monitor,
monitorId = monitor.id,
monitorMetadata,
indexTimeout = monitorCtx.indexTimeout!!
)

val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput
val index = docLevelMonitorInput.indices[0]
val queries: List<DocLevelQuery> = docLevelMonitorInput.queries

var monitorMetadata = getMonitorMetadata(monitorCtx.client!!, monitorCtx.xContentRegistry!!, "${monitor.id}-metadata")
if (monitorMetadata == null) {
monitorMetadata = createMonitorMetadata(monitor.id)
}

val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID
val lastRunContext = if (monitorMetadata.lastRunContext.isNullOrEmpty()) mutableMapOf()
else monitorMetadata.lastRunContext.toMutableMap() as MutableMap<String, MutableMap<String, Any>>
Expand Down Expand Up @@ -132,7 +129,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
// Prepare lastRunContext for each index
val indexLastRunContext = lastRunContext.getOrPut(indexName) {
val indexCreatedRecently = createdRecently(monitor, indexName, periodStart, periodEnd, getIndexResponse)
createRunContext(monitorCtx.clusterService!!, monitorCtx.client!!, indexName, indexCreatedRecently)
MonitorMetadataService.createRunContextForIndex(indexName, indexCreatedRecently)
}

// Prepare updatedLastRunContext for each index
Expand Down Expand Up @@ -160,7 +157,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
val matchingDocs = getMatchingDocs(monitor, monitorCtx, docExecutionContext, indexName)

if (matchingDocs.isNotEmpty()) {
val matchedQueriesForDocs = getMatchedQueries(monitorCtx, matchingDocs.map { it.second }, monitor, indexName)
val matchedQueriesForDocs = getMatchedQueries(
monitorCtx,
matchingDocs.map { it.second },
monitor,
monitorMetadata,
indexName
)

matchedQueriesForDocs.forEach { hit ->
val id = hit.id.replace("_${indexName}_${monitor.id}", "")
Expand Down Expand Up @@ -214,7 +217,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() {

// Don't update monitor if this is a test monitor
if (!isTempMonitor) {
updateMonitorMetadata(monitorCtx.client!!, monitorCtx.settings!!, monitorMetadata.copy(lastRunContext = updatedLastRunContext))
MonitorMetadataService.upsertMetadata(
monitorMetadata.copy(lastRunContext = updatedLastRunContext),
true
)
}

// TODO: Update the Document as part of the Trigger and return back the trigger action result
Expand Down Expand Up @@ -506,6 +512,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitorCtx: MonitorRunnerExecutionContext,
docs: List<BytesReference>,
monitor: Monitor,
monitorMetadata: MonitorMetadata,
index: String
): SearchHits {
val boolQueryBuilder = BoolQueryBuilder().filter(QueryBuilders.matchQuery("index", index))
Expand All @@ -516,7 +523,16 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}
boolQueryBuilder.filter(percolateQueryBuilder)

val searchRequest = SearchRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX)
val queryIndex = monitorMetadata.sourceToQueryIndexMapping[index + monitor.id]
if (queryIndex == null) {
val message = "Failed to resolve concrete queryIndex from sourceIndex during monitor execution!" +
" sourceIndex:$index queryIndex:${ScheduledJob.DOC_LEVEL_QUERIES_INDEX}"
logger.error(message)
throw AlertingException.wrap(
OpenSearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR)
)
}
val searchRequest = SearchRequest(queryIndex)
val searchSourceBuilder = SearchSourceBuilder()
searchSourceBuilder.query(boolQueryBuilder)
searchRequest.source(searchSourceBuilder)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting

import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.OpenSearchSecurityException
import org.opensearch.action.DocWriteRequest
import org.opensearch.action.DocWriteResponse
import org.opensearch.action.admin.indices.get.GetIndexRequest
import org.opensearch.action.admin.indices.get.GetIndexResponse
import org.opensearch.action.admin.indices.stats.IndicesStatsAction
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse
import org.opensearch.action.get.GetRequest
import org.opensearch.action.get.GetResponse
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.index.IndexResponse
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.core.model.DocLevelMonitorInput
import org.opensearch.alerting.core.model.ScheduledJob
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.model.MonitorMetadata
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.AlertingException
import org.opensearch.client.Client
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParserUtils
import org.opensearch.common.xcontent.XContentType
import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.rest.RestStatus
import org.opensearch.transport.RemoteTransportException

private val log = LogManager.getLogger(MonitorMetadataService::class.java)

object MonitorMetadataService :
CoroutineScope by CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("MonitorMetadataService")) {

private lateinit var client: Client
private lateinit var xContentRegistry: NamedXContentRegistry
private lateinit var clusterService: ClusterService
private lateinit var settings: Settings

@Volatile private lateinit var indexTimeout: TimeValue

fun initialize(
client: Client,
clusterService: ClusterService,
xContentRegistry: NamedXContentRegistry,
settings: Settings
) {
this.clusterService = clusterService
this.client = client
this.xContentRegistry = xContentRegistry
this.settings = settings
this.indexTimeout = AlertingSettings.INDEX_TIMEOUT.get(settings)
this.clusterService.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.INDEX_TIMEOUT) { indexTimeout = it }
}

@Suppress("ComplexMethod", "ReturnCount")
suspend fun upsertMetadata(metadata: MonitorMetadata, updating: Boolean): MonitorMetadata {
try {
val indexRequest = IndexRequest(ScheduledJob.SCHEDULED_JOBS_INDEX)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(metadata.toXContent(XContentFactory.jsonBuilder(), ToXContent.MapParams(mapOf("with_type" to "true"))))
.id(metadata.id)
.routing(metadata.monitorId)
.setIfSeqNo(metadata.seqNo)
.setIfPrimaryTerm(metadata.primaryTerm)
.timeout(indexTimeout)

if (updating) {
indexRequest.id(metadata.id).setIfSeqNo(metadata.seqNo).setIfPrimaryTerm(metadata.primaryTerm)
} else {
indexRequest.opType(DocWriteRequest.OpType.CREATE)
}
val response: IndexResponse = client.suspendUntil { index(indexRequest, it) }
when (response.result) {
DocWriteResponse.Result.DELETED, DocWriteResponse.Result.NOOP, DocWriteResponse.Result.NOT_FOUND, null -> {
val failureReason = "The upsert metadata call failed with a ${response.result?.lowercase} result"
log.error(failureReason)
throw AlertingException(failureReason, RestStatus.INTERNAL_SERVER_ERROR, IllegalStateException(failureReason))
}
DocWriteResponse.Result.CREATED, DocWriteResponse.Result.UPDATED -> {
log.debug("Successfully upserted MonitorMetadata:${metadata.id} ")
}
}
return metadata.copy(
seqNo = response.seqNo,
primaryTerm = response.primaryTerm
)
} catch (e: Exception) {
throw AlertingException.wrap(e)
}
}

suspend fun getOrCreateMetadata(monitor: Monitor, createWithRunContext: Boolean = true): Pair<MonitorMetadata, Boolean> {
try {
val created = true
val metadata = getMetadata(monitor)
return if (metadata != null) {
metadata to !created
} else {
val newMetadata = createNewMetadata(monitor, createWithRunContext = createWithRunContext)
upsertMetadata(newMetadata, updating = false) to created
}
} catch (e: Exception) {
throw AlertingException.wrap(e)
}
}

suspend fun getMetadata(monitor: Monitor): MonitorMetadata? {
try {
val metadataId = MonitorMetadata.getId(monitor)
val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, metadataId).routing(monitor.id)

val getResponse: GetResponse = client.suspendUntil { get(getRequest, it) }
return if (getResponse.isExists) {
val xcp = XContentHelper.createParser(
xContentRegistry, LoggingDeprecationHandler.INSTANCE,
getResponse.sourceAsBytesRef, XContentType.JSON
)
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp)
MonitorMetadata.parse(xcp)
} else {
null
}
} catch (e: Exception) {
if (e.message?.contains("no such index") == true) {
return null
} else {
throw AlertingException.wrap(e)
}
}
}

suspend fun recreateRunContext(metadata: MonitorMetadata, monitor: Monitor): MonitorMetadata {
try {
val monitorIndex = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR)
(monitor.inputs[0] as DocLevelMonitorInput).indices[0]
else null
val runContext = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR)
createFullRunContext(monitorIndex, metadata.lastRunContext as MutableMap<String, MutableMap<String, Any>>)
else null
if (runContext != null) {
return metadata.copy(
lastRunContext = runContext
)
} else {
return metadata
}
} catch (e: Exception) {
throw AlertingException.wrap(e)
}
}

private suspend fun createNewMetadata(monitor: Monitor, createWithRunContext: Boolean): MonitorMetadata {
val monitorIndex = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR)
(monitor.inputs[0] as DocLevelMonitorInput).indices[0]
else null
val runContext =
if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR && createWithRunContext)
createFullRunContext(monitorIndex)
else emptyMap()
return MonitorMetadata(
id = "${monitor.id}-metadata",
seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO,
primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
monitorId = monitor.id,
lastActionExecutionTimes = emptyList(),
lastRunContext = runContext,
sourceToQueryIndexMapping = mutableMapOf()
)
}

private suspend fun createFullRunContext(
index: String?,
existingRunContext: MutableMap<String, MutableMap<String, Any>>? = null
): MutableMap<String, MutableMap<String, Any>> {
val lastRunContext = existingRunContext?.toMutableMap() ?: mutableMapOf()
try {
if (index == null) return mutableMapOf()
val getIndexRequest = GetIndexRequest().indices(index)
val getIndexResponse: GetIndexResponse = client.suspendUntil {
client.admin().indices().getIndex(getIndexRequest, it)
}
val indices = getIndexResponse.indices()

indices.forEach { indexName ->
if (!lastRunContext.containsKey(indexName)) {
lastRunContext[indexName] = createRunContextForIndex(index)
}
}
} catch (e: RemoteTransportException) {
val unwrappedException = ExceptionsHelper.unwrapCause(e) as Exception
throw AlertingException("Failed fetching index stats", RestStatus.INTERNAL_SERVER_ERROR, unwrappedException)
} catch (e: OpenSearchSecurityException) {
throw AlertingException(
"Failed fetching index stats - missing required index permissions: ${e.localizedMessage}",
RestStatus.INTERNAL_SERVER_ERROR,
e
)
} catch (e: Exception) {
throw AlertingException("Failed fetching index stats", RestStatus.INTERNAL_SERVER_ERROR, e)
}
return lastRunContext
}

suspend fun createRunContextForIndex(index: String, createdRecently: Boolean = false): MutableMap<String, Any> {
val request = IndicesStatsRequest().indices(index).clear()
val response: IndicesStatsResponse = client.suspendUntil { execute(IndicesStatsAction.INSTANCE, request, it) }
if (response.status != RestStatus.OK) {
val errorMessage = "Failed fetching index stats for index:$index"
throw AlertingException(errorMessage, RestStatus.INTERNAL_SERVER_ERROR, IllegalStateException(errorMessage))
}
val shards = response.shards.filter { it.shardRouting.primary() && it.shardRouting.active() }
val lastRunContext = HashMap<String, Any>()
lastRunContext["index"] = index
val count = shards.size
lastRunContext["shards_count"] = count

for (shard in shards) {
lastRunContext[shard.shardRouting.id.toString()] =
if (createdRecently) -1L
else shard.seqNoStats?.globalCheckpoint ?: SequenceNumbers.UNASSIGNED_SEQ_NO
}
return lastRunContext
}
}
Loading

0 comments on commit eb9951f

Please sign in to comment.