Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QueryIndex rollover when field mapping limit is reached #725

Merged
merged 19 commits into from
Dec 31, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,20 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
destinationMigrationCoordinator = DestinationMigrationCoordinator(client, clusterService, threadPool, scheduledJobIndices)
this.threadPool = threadPool
this.clusterService = clusterService

MonitorMetadataFactory.initialize(
client,
clusterService,
xContentRegistry,
settings
)
MonitorMetadataService.initialize(
client,
clusterService,
xContentRegistry,
settings
)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are these objects needed? looks like lot of old alerting code is refactored into these classes?

return listOf(sweeper, scheduler, runner, scheduledJobIndices, docLevelMonitorQueries, destinationMigrationCoordinator)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.alerting

import org.apache.logging.log4j.LogManager
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.admin.indices.get.GetIndexRequest
import org.opensearch.action.admin.indices.get.GetIndexResponse
import org.opensearch.action.index.IndexRequest
Expand All @@ -14,17 +15,16 @@ import org.opensearch.action.search.SearchAction
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.model.AlertingConfigAccessor.Companion.getMonitorMetadata
import org.opensearch.alerting.model.DocumentExecutionContext
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.MonitorMetadata
import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.util.updateMonitorMetadata
import org.opensearch.client.Client
import org.opensearch.cluster.routing.ShardRouting
import org.opensearch.cluster.service.ClusterService
Expand Down Expand Up @@ -52,7 +52,6 @@ import org.opensearch.search.sort.SortOrder
import java.io.IOException
import java.time.Instant
import java.util.UUID
import kotlin.collections.HashMap
import kotlin.math.max

object DocumentLevelMonitorRunner : MonitorRunner() {
Expand Down Expand Up @@ -85,22 +84,20 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
return monitorResult.copy(error = AlertingException.wrap(e))
}

var (monitorMetadata, _) = MonitorMetadataService.getOrCreateMetadata(monitor, createWithRunContext = false)

monitorCtx.docLevelMonitorQueries!!.initDocLevelQueryIndex(monitor.dataSources)
monitorCtx.docLevelMonitorQueries!!.indexDocLevelQueries(
monitor = monitor,
monitorId = monitor.id,
monitorMetadata,
indexTimeout = monitorCtx.indexTimeout!!
)

val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput
val index = docLevelMonitorInput.indices[0]
val queries: List<DocLevelQuery> = docLevelMonitorInput.queries

var monitorMetadata = getMonitorMetadata(monitorCtx.client!!, monitorCtx.xContentRegistry!!, "${monitor.id}-metadata")
if (monitorMetadata == null) {
monitorMetadata = createMonitorMetadata(monitor.id)
}

val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID
val lastRunContext = if (monitorMetadata.lastRunContext.isNullOrEmpty()) mutableMapOf()
else monitorMetadata.lastRunContext.toMutableMap() as MutableMap<String, MutableMap<String, Any>>
Expand Down Expand Up @@ -129,7 +126,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
// Prepare lastRunContext for each index
val indexLastRunContext = lastRunContext.getOrPut(indexName) {
val indexCreatedRecently = createdRecently(monitor, indexName, periodStart, periodEnd, getIndexResponse)
createRunContext(monitorCtx.clusterService!!, monitorCtx.client!!, indexName, indexCreatedRecently)
MonitorMetadataService.createRunContextForIndex(indexName, indexCreatedRecently)
}

// Prepare updatedLastRunContext for each index
Expand Down Expand Up @@ -157,7 +154,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
val matchingDocs = getMatchingDocs(monitor, monitorCtx, docExecutionContext, indexName)

if (matchingDocs.isNotEmpty()) {
val matchedQueriesForDocs = getMatchedQueries(monitorCtx, matchingDocs.map { it.second }, monitor, indexName)
val matchedQueriesForDocs = getMatchedQueries(
monitorCtx,
matchingDocs.map { it.second },
monitor,
monitorMetadata,
indexName
)

matchedQueriesForDocs.forEach { hit ->
val id = hit.id.replace("_${indexName}_${monitor.id}", "")
Expand Down Expand Up @@ -208,7 +211,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() {

// Don't update monitor if this is a test monitor
if (!isTempMonitor) {
updateMonitorMetadata(monitorCtx.client!!, monitorCtx.settings!!, monitorMetadata.copy(lastRunContext = updatedLastRunContext))
MonitorMetadataService.upsertMetadata(
monitorMetadata.copy(lastRunContext = updatedLastRunContext),
true
)
}

// TODO: Update the Document as part of the Trigger and return back the trigger action result
Expand Down Expand Up @@ -500,6 +506,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitorCtx: MonitorRunnerExecutionContext,
docs: List<BytesReference>,
monitor: Monitor,
monitorMetadata: MonitorMetadata,
index: String
): SearchHits {
val boolQueryBuilder = BoolQueryBuilder().filter(QueryBuilders.matchQuery("index", index))
Expand All @@ -510,7 +517,15 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}
boolQueryBuilder.filter(percolateQueryBuilder)

val queryIndex = monitor.dataSources.queryIndex
val queryIndex = monitorMetadata.sourceToQueryIndexMapping[index + monitor.id]
if (queryIndex == null) {
val message = "Failed to resolve concrete queryIndex from sourceIndex during monitor execution!" +
" sourceIndex:$index queryIndex:${monitor.dataSources.queryIndex}"
logger.error(message)
throw AlertingException.wrap(
OpenSearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR)
)
}
val searchRequest = SearchRequest(queryIndex)
val searchSourceBuilder = SearchSourceBuilder()
searchSourceBuilder.query(boolQueryBuilder)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting

import org.opensearch.action.admin.indices.get.GetIndexRequest
import org.opensearch.action.admin.indices.get.GetIndexResponse
import org.opensearch.alerting.model.MonitorMetadata
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.client.Client
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.index.seqno.SequenceNumbers

object MonitorMetadataFactory {

private lateinit var client: Client
private lateinit var xContentRegistry: NamedXContentRegistry
private lateinit var clusterService: ClusterService
private lateinit var settings: Settings

fun initialize(
client: Client,
clusterService: ClusterService,
xContentRegistry: NamedXContentRegistry,
settings: Settings
) {
this.clusterService = clusterService
this.client = client
this.xContentRegistry = xContentRegistry
this.settings = settings
}

suspend fun newInstance(monitor: Monitor, createWithRunContext: Boolean): MonitorMetadata {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is an unused method?

val monitorIndex = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR)
(monitor.inputs[0] as DocLevelMonitorInput).indices[0]
else null
val runContext =
if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR && createWithRunContext)
createFullRunContext(monitorIndex)
else emptyMap()
return MonitorMetadata(
id = "${monitor.id}-metadata",
seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO,
primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
monitorId = monitor.id,
lastActionExecutionTimes = emptyList(),
lastRunContext = runContext,
sourceToQueryIndexMapping = mutableMapOf()
)
}

private suspend fun createFullRunContext(
index: String?,
existingRunContext: MutableMap<String, MutableMap<String, Any>>? = null
): MutableMap<String, MutableMap<String, Any>> {
if (index == null) return mutableMapOf()
val getIndexRequest = GetIndexRequest().indices(index)
val getIndexResponse: GetIndexResponse = client.suspendUntil {
client.admin().indices().getIndex(getIndexRequest, it)
}
val indices = getIndexResponse.indices()
val lastRunContext = existingRunContext?.toMutableMap() ?: mutableMapOf()
indices.forEach { indexName ->
if (!lastRunContext.containsKey(indexName))
lastRunContext[indexName] = DocumentLevelMonitorRunner.createRunContext(clusterService, client, indexName)
}
return lastRunContext
}
}
Loading