From fb8dee8602029f1023c45827991c54189780741a Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Wed, 6 Apr 2022 08:01:40 +0000 Subject: [PATCH] percolate query implementation in doc-level alerting Signed-off-by: Subhobrata Dey --- alerting/build.gradle | 1 + .../org/opensearch/alerting/AlertingPlugin.kt | 5 +- .../DocumentReturningMonitorRunner.kt | 151 ++++++++++++++- .../transport/TransportDeleteMonitorAction.kt | 23 +++ .../TransportExecuteMonitorAction.kt | 97 +++++++++- .../transport/TransportIndexMonitorAction.kt | 172 ++++++++++++++++++ .../alerting/DocumentMonitorRunnerIT.kt | 1 + .../alerting/core/DocLevelMonitorQueries.kt | 39 ++++ .../alerting/core/model/ScheduledJob.kt | 1 + .../resources/mappings/doc-level-queries.json | 13 ++ 10 files changed, 492 insertions(+), 11 deletions(-) create mode 100644 core/src/main/kotlin/org/opensearch/alerting/core/DocLevelMonitorQueries.kt create mode 100644 core/src/main/resources/mappings/doc-level-queries.json diff --git a/alerting/build.gradle b/alerting/build.gradle index 47d0026f5..62a9b9e62 100644 --- a/alerting/build.gradle +++ b/alerting/build.gradle @@ -56,6 +56,7 @@ configurations.testImplementation { dependencies { compileOnly "org.opensearch.plugin:opensearch-scripting-painless-spi:${versions.opensearch}" + api "org.opensearch.plugin:percolator-client:${opensearch_version}" // OpenSearch Nanny state implementation "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}" diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index ace23402b..5133db3f1 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -28,6 +28,7 @@ import org.opensearch.alerting.action.SearchEmailGroupAction import org.opensearch.alerting.action.SearchMonitorAction import org.opensearch.alerting.aggregation.bucketselectorext.BucketSelectorExtAggregationBuilder import org.opensearch.alerting.alerts.AlertIndices +import org.opensearch.alerting.core.DocLevelMonitorQueries import org.opensearch.alerting.core.JobSweeper import org.opensearch.alerting.core.ScheduledJobIndices import org.opensearch.alerting.core.action.node.ScheduledJobsStatsAction @@ -152,6 +153,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R lateinit var scheduler: JobScheduler lateinit var sweeper: JobSweeper lateinit var scheduledJobIndices: ScheduledJobIndices + lateinit var docLevelMonitorQueries: DocLevelMonitorQueries lateinit var threadPool: ThreadPool lateinit var alertIndices: AlertIndices lateinit var clusterService: ClusterService @@ -257,11 +259,12 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R .registerConsumers() .registerDestinationSettings() scheduledJobIndices = ScheduledJobIndices(client.admin(), clusterService) + docLevelMonitorQueries = DocLevelMonitorQueries(client.admin(), clusterService) scheduler = JobScheduler(threadPool, runner) sweeper = JobSweeper(environment.settings(), client, clusterService, threadPool, xContentRegistry, scheduler, ALERTING_JOB_TYPES) this.threadPool = threadPool this.clusterService = clusterService - return listOf(sweeper, scheduler, runner, scheduledJobIndices) + return listOf(sweeper, scheduler, runner, scheduledJobIndices, docLevelMonitorQueries) } override fun getSettings(): List> { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentReturningMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentReturningMonitorRunner.kt index 16bb92b5e..4b4f47253 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentReturningMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentReturningMonitorRunner.kt @@ -4,11 +4,13 @@ import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.withContext import org.apache.logging.log4j.LogManager import org.opensearch.action.index.IndexRequest +import org.opensearch.action.search.SearchAction import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.WriteRequest import org.opensearch.alerting.core.model.DocLevelMonitorInput import org.opensearch.alerting.core.model.DocLevelQuery +import org.opensearch.alerting.core.model.ScheduledJob import org.opensearch.alerting.elasticapi.string import org.opensearch.alerting.model.ActionRunResult import org.opensearch.alerting.model.Alert @@ -30,11 +32,14 @@ import org.opensearch.client.Client import org.opensearch.cluster.routing.ShardRouting import org.opensearch.cluster.service.ClusterService import org.opensearch.common.Strings +import org.opensearch.common.bytes.BytesReference import org.opensearch.common.xcontent.ToXContent import org.opensearch.common.xcontent.XContentBuilder +import org.opensearch.common.xcontent.XContentFactory import org.opensearch.common.xcontent.XContentType import org.opensearch.index.query.BoolQueryBuilder import org.opensearch.index.query.QueryBuilders +import org.opensearch.percolator.PercolateQueryBuilder import org.opensearch.rest.RestStatus import org.opensearch.search.SearchHits import org.opensearch.search.builder.SearchSourceBuilder @@ -93,22 +98,59 @@ object DocumentReturningMonitorRunner : MonitorRunner { } } - val queryToDocIds = mutableMapOf>() + val queryToDocIds = mutableMapOf>() val docsToQueries = mutableMapOf>() val docExecutionContext = DocumentExecutionContext(queries, lastRunContext, updatedLastRunContext) val idQueryMap = mutableMapOf() - queries.forEach { query -> - val matchingDocIds = runForEachQuery(monitorCtx, docExecutionContext, query, index) + + val matchingDocs = getMatchingDocs(monitor, monitorCtx, docExecutionContext, index, dryrun) +/* matchingDocs.forEach { + logger.info(monitor.id + "-" + it.first) + }*/ + if (matchingDocs.isNotEmpty()) { + val matchedQueriesForDocs = getMatchedQueries(monitorCtx, matchingDocs.map { it.second }, monitor) +// logger.info(monitor.id + "-" + matchedQueriesForDocs.hits.size) + + matchedQueriesForDocs.forEach { hit -> + val (id, query) = Pair( + hit.id.replace("_${monitor.id}", ""), + ((hit.sourceAsMap["query"] as HashMap<*, *>)["query_string"] as HashMap<*, *>)["query"] + ) +// logger.info("found hit-$id-$query") + val docLevelQuery = DocLevelQuery(id, id, query.toString()) + + val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() } + docIndices.forEach { idx -> + if (queryToDocIds.containsKey(docLevelQuery)) { + queryToDocIds[docLevelQuery]?.add(matchingDocs[idx].first) + } else { + queryToDocIds[docLevelQuery] = mutableSetOf(matchingDocs[idx].first) + } + + if (docsToQueries.containsKey(matchingDocs[idx].first)) { + docsToQueries[matchingDocs[idx].first]?.add(id) + } else { + docsToQueries[matchingDocs[idx].first] = mutableListOf(id) + } + } + } + } + +/* queries.forEach { query -> + val matchingDocIds = runForEachQuery(monitor, monitorCtx, docExecutionContext, query, index, dryrun) queryToDocIds[query] = matchingDocIds matchingDocIds.forEach { docsToQueries.putIfAbsent(it, mutableListOf()) docsToQueries[it]?.add(query.id) } - idQueryMap[query.id] = query - } + + }*/ val queryInputResults = queryToDocIds.mapKeys { it.key.id } monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(queryInputResults))) - val queryIds = queries.map { it.id } + val queryIds = queries.map { + idQueryMap[it.id] = it + it.id + } val triggerResults = mutableMapOf() monitor.triggers.forEach { @@ -339,6 +381,56 @@ object DocumentReturningMonitorRunner : MonitorRunner { return allShards.filter { it.primary() }.size } + private fun getMatchingDocs( + monitor: Monitor, + monitorCtx: MonitorRunnerExecutionContext, + docExecutionCtx: DocumentExecutionContext, + index: String, + dryrun: Boolean + ): List> { + val count: Int = docExecutionCtx.lastRunContext["shards_count"] as Int + val matchingDocs = mutableListOf>() + for (i: Int in 0 until count) { + val shard = i.toString() + try { + logger.info("Monitor execution for shard: $shard") + + val maxSeqNo: Long = docExecutionCtx.updatedLastRunContext[shard].toString().toLong() + logger.info("MaxSeqNo of shard_$shard is $maxSeqNo") + + // If dryrun, set the previous sequence number as 1 less than the max sequence number or 0 + val prevSeqNo = if (dryrun || monitor.id == Monitor.NO_ID) + max(-1, maxSeqNo - 1) + else docExecutionCtx.lastRunContext[shard].toString().toLongOrNull() + + if (dryrun) { + logger.info("it is a dryrun") + } + + logger.info("prevSeq: $prevSeqNo, maxSeq: $maxSeqNo") + + val hits: SearchHits = searchShard( + monitorCtx, + index, + shard, + prevSeqNo, + maxSeqNo, + null + ) + logger.info("Search hits for shard_$shard is: ${hits.hits.size}") + + if (hits.hits.isNotEmpty()) { +// logger.info("found matches") + matchingDocs.addAll(getAllDocs(hits, monitor.id)) + } + } catch (e: Exception) { + logger.info("Failed to run for shard $shard. Error: ${e.message}") + logger.debug("Failed to run for shard $shard", e) + } + } + return matchingDocs + } + private fun runForEachQuery( monitorCtx: MonitorRunnerExecutionContext, docExecutionCtx: DocumentExecutionContext, @@ -384,14 +476,17 @@ object DocumentReturningMonitorRunner : MonitorRunner { shard: String, prevSeqNo: Long?, maxSeqNo: Long, - query: String + query: String? ): SearchHits { if (prevSeqNo?.equals(maxSeqNo) == true && maxSeqNo != 0L) { return SearchHits.empty() } val boolQueryBuilder = BoolQueryBuilder() boolQueryBuilder.filter(QueryBuilders.rangeQuery("_seq_no").gt(prevSeqNo).lte(maxSeqNo)) - boolQueryBuilder.must(QueryBuilders.queryStringQuery(query)) + + if (query != null) { + boolQueryBuilder.must(QueryBuilders.queryStringQuery(query)) + } val request: SearchRequest = SearchRequest() .indices(index) @@ -410,7 +505,47 @@ object DocumentReturningMonitorRunner : MonitorRunner { return response.hits } + private fun getMatchedQueries( + monitorCtx: MonitorRunnerExecutionContext, + docs: List, + monitor: Monitor + ): SearchHits { + val percolateQueryBuilder = PercolateQueryBuilder("query", docs, XContentType.JSON) + + val searchRequest = SearchRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) + val searchSourceBuilder = SearchSourceBuilder() + searchSourceBuilder.query(percolateQueryBuilder) + searchRequest.source(searchSourceBuilder) + + return if (monitorCtx.clusterService!!.state().routingTable.hasIndex(ScheduledJob.DOC_LEVEL_QUERIES_INDEX)) { + val response: SearchResponse = monitorCtx.client!!.execute(SearchAction.INSTANCE, searchRequest).actionGet() + + if (response.status() !== RestStatus.OK) { + throw IOException("Failed to search percolate index: ${ScheduledJob.DOC_LEVEL_QUERIES_INDEX}") + } + response.hits + } else { + SearchHits.empty() + } + } + private fun getAllDocIds(hits: SearchHits): List { return hits.map { hit -> hit.id } } + + private fun getAllDocs(hits: SearchHits, monitorId: String): List> { + return hits.map { hit -> + val sourceMap = hit.sourceAsMap + + var xContentBuilder = XContentFactory.jsonBuilder().startObject() + sourceMap.forEach { (k, v) -> + xContentBuilder = xContentBuilder.field("${k}_$monitorId", v) + } + xContentBuilder = xContentBuilder.endObject() + + val sourceRef = BytesReference.bytes(xContentBuilder) + + Pair(hit.id, sourceRef) + } + } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt index 606795f22..b52d5817a 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt @@ -29,6 +29,10 @@ import org.opensearch.common.xcontent.NamedXContentRegistry import org.opensearch.common.xcontent.XContentHelper import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.authuser.User +import org.opensearch.index.query.QueryBuilders +import org.opensearch.index.reindex.BulkByScrollResponse +import org.opensearch.index.reindex.DeleteByQueryAction +import org.opensearch.index.reindex.DeleteByQueryRequestBuilder import org.opensearch.rest.RestStatus import org.opensearch.tasks.Task import org.opensearch.transport.TransportService @@ -132,6 +136,10 @@ class TransportDeleteMonitorAction @Inject constructor( deleteRequest, object : ActionListener { override fun onResponse(response: DeleteResponse) { + val clusterState = clusterService.state() + if (clusterState.routingTable.hasIndex(ScheduledJob.DOC_LEVEL_QUERIES_INDEX)) { + deleteDocLevelMonitorQueries() + } actionListener.onResponse(response) } @@ -141,5 +149,20 @@ class TransportDeleteMonitorAction @Inject constructor( } ) } + + private fun deleteDocLevelMonitorQueries() { + DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) + .source(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) + .filter(QueryBuilders.matchQuery("monitor_id", monitorId)) + .execute( + object : ActionListener { + override fun onResponse(response: BulkByScrollResponse) { + } + + override fun onFailure(t: Exception) { + } + } + ) + } } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt index 8813418fa..60109e620 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt @@ -11,19 +11,32 @@ import kotlinx.coroutines.withContext import org.apache.logging.log4j.LogManager import org.opensearch.OpenSearchStatusException import org.opensearch.action.ActionListener +import org.opensearch.action.admin.indices.create.CreateIndexResponse +import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest +import org.opensearch.action.bulk.BulkRequest +import org.opensearch.action.bulk.BulkResponse import org.opensearch.action.get.GetRequest import org.opensearch.action.get.GetResponse +import org.opensearch.action.index.IndexRequest import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction +import org.opensearch.action.support.WriteRequest +import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.alerting.MonitorRunnerService import org.opensearch.alerting.action.ExecuteMonitorAction import org.opensearch.alerting.action.ExecuteMonitorRequest import org.opensearch.alerting.action.ExecuteMonitorResponse +import org.opensearch.alerting.core.DocLevelMonitorQueries +import org.opensearch.alerting.core.model.DocLevelMonitorInput +import org.opensearch.alerting.core.model.DocLevelQuery import org.opensearch.alerting.core.model.ScheduledJob import org.opensearch.alerting.model.Monitor +import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.util.AlertingException import org.opensearch.client.Client +import org.opensearch.cluster.service.ClusterService import org.opensearch.common.inject.Inject +import org.opensearch.common.settings.Settings import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.NamedXContentRegistry import org.opensearch.common.xcontent.XContentHelper @@ -40,12 +53,16 @@ private val log = LogManager.getLogger(TransportGetMonitorAction::class.java) class TransportExecuteMonitorAction @Inject constructor( transportService: TransportService, private val client: Client, + private val clusterService: ClusterService, private val runner: MonitorRunnerService, actionFilters: ActionFilters, - val xContentRegistry: NamedXContentRegistry + val xContentRegistry: NamedXContentRegistry, + private val docLevelMonitorQueries: DocLevelMonitorQueries, + private val settings: Settings ) : HandledTransportAction ( ExecuteMonitorAction.NAME, transportService, actionFilters, ::ExecuteMonitorRequest ) { + @Volatile private var indexTimeout = AlertingSettings.INDEX_TIMEOUT.get(settings) override fun doExecute(task: Task, execMonitorRequest: ExecuteMonitorRequest, actionListener: ActionListener) { @@ -74,6 +91,66 @@ class TransportExecuteMonitorAction @Inject constructor( } } } + val indexDocLevelMonitorQueries = fun(monitor: Monitor, monitorId: String) { + val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput + val index = docLevelMonitorInput.indices[0] + val queries: List = docLevelMonitorInput.queries + + val clusterState = clusterService.state() + if (clusterState.routingTable.hasIndex(index)) { + val indexMetadata = clusterState.metadata.index(index) + + if (indexMetadata.mapping() != null) { + val properties = ((indexMetadata.mapping()?.sourceAsMap?.get("properties")) as Map>) + val updatedProperties = properties.entries.associate { "${it.key}_$monitorId" to it.value }.toMutableMap() + + val updateMappingRequest = PutMappingRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) + updateMappingRequest.source(mapOf("properties" to updatedProperties)) + + client.admin().indices().putMapping( + updateMappingRequest, + object : ActionListener { + override fun onResponse(response: AcknowledgedResponse) { + log.info("Percolation index ${ScheduledJob.DOC_LEVEL_QUERIES_INDEX} updated with new mappings") + + val request = BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).timeout(indexTimeout) + + queries.forEach { + var query = it.query + + properties.forEach { prop -> + query = query.replace(prop.key, "${prop.key}_$monitorId") + } + val indexRequest = IndexRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) + .id(it.id + "_$monitorId") + .source(mapOf("query" to mapOf("query_string" to mapOf("query" to query)), "monitor_id" to monitorId)) + request.add(indexRequest) + } + + client.bulk( + request, + object : ActionListener { + override fun onResponse(response: BulkResponse) { + log.info("Queries inserted into Percolate index ${ScheduledJob.DOC_LEVEL_QUERIES_INDEX}") + log.info("call return") + executeMonitor(monitor) + } + + override fun onFailure(t: Exception) { + actionListener.onFailure(AlertingException.wrap(t)) + } + } + ) + } + + override fun onFailure(e: Exception) { + actionListener.onFailure(AlertingException.wrap(e)) + } + } + ) + } + } + } if (execMonitorRequest.monitorId != null) { val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX).id(execMonitorRequest.monitorId) @@ -110,7 +187,23 @@ class TransportExecuteMonitorAction @Inject constructor( true -> execMonitorRequest.monitor as Monitor false -> (execMonitorRequest.monitor as Monitor).copy(user = user) } - executeMonitor(monitor) + + if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { + if (!docLevelMonitorQueries.docLevelQueryIndexExists()) { + docLevelMonitorQueries.initDocLevelQueryIndex(object : ActionListener { + override fun onResponse(response: CreateIndexResponse) { + log.info("Central Percolation index ${ScheduledJob.DOC_LEVEL_QUERIES_INDEX} created") + indexDocLevelMonitorQueries(monitor, monitor.id) + } + + override fun onFailure(t: Exception) { + actionListener.onFailure(AlertingException.wrap(t)) + } + }) + } + } else { + executeMonitor(monitor) + } } } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt index 3575d5792..bd2cf4e54 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt @@ -10,6 +10,9 @@ import org.opensearch.OpenSearchSecurityException import org.opensearch.OpenSearchStatusException import org.opensearch.action.ActionListener import org.opensearch.action.admin.indices.create.CreateIndexResponse +import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest +import org.opensearch.action.bulk.BulkRequest +import org.opensearch.action.bulk.BulkResponse import org.opensearch.action.get.GetRequest import org.opensearch.action.get.GetResponse import org.opensearch.action.index.IndexRequest @@ -18,13 +21,16 @@ import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction +import org.opensearch.action.support.WriteRequest.RefreshPolicy import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.alerting.DocumentReturningMonitorRunner import org.opensearch.alerting.action.IndexMonitorAction import org.opensearch.alerting.action.IndexMonitorRequest import org.opensearch.alerting.action.IndexMonitorResponse +import org.opensearch.alerting.core.DocLevelMonitorQueries import org.opensearch.alerting.core.ScheduledJobIndices import org.opensearch.alerting.core.model.DocLevelMonitorInput +import org.opensearch.alerting.core.model.DocLevelQuery import org.opensearch.alerting.core.model.ScheduledJob import org.opensearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX import org.opensearch.alerting.core.model.SearchInput @@ -52,6 +58,9 @@ import org.opensearch.common.xcontent.XContentHelper import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.authuser.User import org.opensearch.index.query.QueryBuilders +import org.opensearch.index.reindex.BulkByScrollResponse +import org.opensearch.index.reindex.DeleteByQueryAction +import org.opensearch.index.reindex.DeleteByQueryRequestBuilder import org.opensearch.rest.RestRequest import org.opensearch.rest.RestStatus import org.opensearch.search.builder.SearchSourceBuilder @@ -67,6 +76,7 @@ class TransportIndexMonitorAction @Inject constructor( val client: Client, actionFilters: ActionFilters, val scheduledJobIndices: ScheduledJobIndices, + val docLevelMonitorQueries: DocLevelMonitorQueries, val clusterService: ClusterService, val settings: Settings, val xContentRegistry: NamedXContentRegistry @@ -395,6 +405,11 @@ class TransportIndexMonitorAction @Inject constructor( ) return } + + if (request.monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { + indexDocLevelMonitorQueries(request.monitor, response.id, request.refreshPolicy) + } + actionListener.onResponse( IndexMonitorResponse( response.id, response.version, response.seqNo, @@ -409,6 +424,147 @@ class TransportIndexMonitorAction @Inject constructor( ) } + @Suppress("UNCHECKED_CAST") + private fun indexDocLevelMonitorQueries(monitor: Monitor, monitorId: String, refreshPolicy: RefreshPolicy) { + val indexDocLevelQueries = fun(monitor: Monitor, monitorId: String) { + val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput + val index = docLevelMonitorInput.indices[0] + val queries: List = docLevelMonitorInput.queries + + val clusterState = clusterService.state() + if (clusterState.routingTable.hasIndex(index)) { + val indexMetadata = clusterState.metadata.index(index) + + if (indexMetadata.mapping() != null) { + val properties = ((indexMetadata.mapping()?.sourceAsMap?.get("properties")) as Map>) + val updatedProperties = properties.entries.associate { "${it.key}_$monitorId" to it.value }.toMutableMap() + + val updateMappingRequest = PutMappingRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) + updateMappingRequest.source(mapOf("properties" to updatedProperties)) + + client.admin().indices().putMapping( + updateMappingRequest, + object : ActionListener { + override fun onResponse(response: AcknowledgedResponse) { + log.info("Percolation index ${ScheduledJob.DOC_LEVEL_QUERIES_INDEX} updated with new mappings") + + val request = BulkRequest().setRefreshPolicy(refreshPolicy).timeout(indexTimeout) + + queries.forEach { + var query = it.query + + properties.forEach { prop -> + query = query.replace(prop.key, "${prop.key}_$monitorId") + } + val indexRequest = IndexRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) + .id(it.id + "_$monitorId") + .source(mapOf("query" to mapOf("query_string" to mapOf("query" to query)), "monitor_id" to monitorId)) + request.add(indexRequest) + } + + client.bulk( + request, + object : ActionListener { + override fun onResponse(response: BulkResponse) { + log.info("Queries inserted into Percolate index ${ScheduledJob.DOC_LEVEL_QUERIES_INDEX}") + } + + override fun onFailure(t: Exception) { + actionListener.onFailure(AlertingException.wrap(t)) + } + } + ) + } + + override fun onFailure(e: Exception) { + actionListener.onFailure(AlertingException.wrap(e)) + } + } + ) + } + } + } + + if (!docLevelMonitorQueries.docLevelQueryIndexExists()) { + docLevelMonitorQueries.initDocLevelQueryIndex(object : ActionListener { + override fun onResponse(response: CreateIndexResponse) { + log.info("Central Percolation index ${ScheduledJob.DOC_LEVEL_QUERIES_INDEX} created") + indexDocLevelQueries(monitor, monitorId) + } + + override fun onFailure(t: Exception) { + actionListener.onFailure(AlertingException.wrap(t)) + } + }) + } +/* val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput + val index = docLevelMonitorInput.indices[0] + val queries: List = docLevelMonitorInput.queries + + if (!docLevelMonitorQueries.docLevelQueryIndexExists()) { + docLevelMonitorQueries.initDocLevelQueryIndex(null) + log.info("Central Percolation index ${ScheduledJob.DOC_LEVEL_QUERIES_INDEX} created") + } + + val clusterState = clusterService.state() + if (clusterState.routingTable.hasIndex(index)) { + val indexMetadata = clusterState.metadata.index(index) + + if (indexMetadata.mapping() != null) { + val properties = + ((indexMetadata.mapping()?.sourceAsMap?.get("properties")) as Map>) + val updatedProperties = + properties.entries.associate { "${it.key}_$monitorId" to it.value }.toMutableMap() + + if (clusterState.routingTable.hasIndex(ScheduledJob.DOC_LEVEL_QUERIES_INDEX)) { + val updateMappingRequest = PutMappingRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) + updateMappingRequest.source(mapOf("properties" to updatedProperties)) + + val ackResponse = client.admin().indices().putMapping(updateMappingRequest).actionGet() + if (!ackResponse.isAcknowledged) { + actionListener.onFailure(AlertingException.wrap(RuntimeException("Percolation index ${ScheduledJob.DOC_LEVEL_QUERIES_INDEX} cannot be updated with new mappings"))) + return + } + + log.info("Percolation index ${ScheduledJob.DOC_LEVEL_QUERIES_INDEX} updated with new mappings") + + val request = BulkRequest() + + queries.forEach { + var query = it.query + + properties.forEach { prop -> + query = query.replace(prop.key, "${prop.key}_$monitorId") + } + val indexRequest = IndexRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) + .id(it.id + "_$monitorId") + .source( + mapOf( + "query" to mapOf("query_string" to mapOf("query" to query)), + "monitor_id" to monitorId + ) + ) + request.add(indexRequest) + } + + val bulkResponse = client.bulk(request).actionGet() + if (bulkResponse.hasFailures()) { + actionListener.onFailure(AlertingException.wrap(RuntimeException(bulkResponse.buildFailureMessage()))) + return + } + + log.info("Queries inserted into Percolate index ${ScheduledJob.DOC_LEVEL_QUERIES_INDEX}") + client.admin().indices().refresh(RefreshRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX)) + .actionGet() + } + } else { + actionListener.onFailure(AlertingException.wrap(RuntimeException("Percolate index ${ScheduledJob.DOC_LEVEL_QUERIES_INDEX} not found"))) + } + } else { + actionListener.onFailure(AlertingException.wrap(RuntimeException("Input index $index not found"))) + }*/ + } + private fun updateMonitor() { val getRequest = GetRequest(SCHEDULED_JOBS_INDEX, request.monitorId) client.get( @@ -476,6 +632,22 @@ class TransportIndexMonitorAction @Inject constructor( ) return } + + if (currentMonitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { + DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) + .source(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) + .filter(QueryBuilders.matchQuery("monitor_id", currentMonitor.id)) + .execute( + object : ActionListener { + override fun onResponse(response: BulkByScrollResponse) { + indexDocLevelMonitorQueries(request.monitor, currentMonitor.id, request.refreshPolicy) + } + + override fun onFailure(t: Exception) { + } + } + ) + } actionListener.onResponse( IndexMonitorResponse( response.id, response.version, response.seqNo, diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index baa2a8946..fcc70ffba 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -119,6 +119,7 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { val trigger = randomDocumentReturningTrigger(condition = ALWAYS_RUN) val monitor = createMonitor(randomDocumentReturningMonitor(inputs = listOf(docReturningInput), triggers = listOf(trigger))) + Thread.sleep(2000) indexDoc(testIndex, "1", testDoc) indexDoc(testIndex, "5", testDoc) diff --git a/core/src/main/kotlin/org/opensearch/alerting/core/DocLevelMonitorQueries.kt b/core/src/main/kotlin/org/opensearch/alerting/core/DocLevelMonitorQueries.kt new file mode 100644 index 000000000..ac55c1907 --- /dev/null +++ b/core/src/main/kotlin/org/opensearch/alerting/core/DocLevelMonitorQueries.kt @@ -0,0 +1,39 @@ +package org.opensearch.alerting.core + +import org.opensearch.action.ActionListener +import org.opensearch.action.admin.indices.create.CreateIndexRequest +import org.opensearch.action.admin.indices.create.CreateIndexResponse +import org.opensearch.alerting.core.model.ScheduledJob +import org.opensearch.client.AdminClient +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.settings.Settings + +class DocLevelMonitorQueries(private val client: AdminClient, private val clusterService: ClusterService) { + companion object { + @JvmStatic + fun docLevelQueriesMappings(): String { + return DocLevelMonitorQueries::class.java.classLoader.getResource("mappings/doc-level-queries.json").readText() + } + } + + fun initDocLevelQueryIndex(actionListener: ActionListener?) { + if (!docLevelQueryIndexExists()) { + var indexRequest = CreateIndexRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) + .mapping(docLevelQueriesMappings()) + .settings( + Settings.builder().put("index.hidden", true) + .build() + ) + if (actionListener != null) { + client.indices().create(indexRequest, actionListener) + } else { + client.indices().create(indexRequest).actionGet() + } + } + } + + fun docLevelQueryIndexExists(): Boolean { + val clusterState = clusterService.state() + return clusterState.routingTable.hasIndex(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) + } +} diff --git a/core/src/main/kotlin/org/opensearch/alerting/core/model/ScheduledJob.kt b/core/src/main/kotlin/org/opensearch/alerting/core/model/ScheduledJob.kt index 6b132ced6..95e48d7e5 100644 --- a/core/src/main/kotlin/org/opensearch/alerting/core/model/ScheduledJob.kt +++ b/core/src/main/kotlin/org/opensearch/alerting/core/model/ScheduledJob.kt @@ -36,6 +36,7 @@ interface ScheduledJob : Writeable, ToXContentObject { companion object { /** The name of the ElasticSearch index in which we store jobs */ const val SCHEDULED_JOBS_INDEX = ".opendistro-alerting-config" + const val DOC_LEVEL_QUERIES_INDEX = ".opendistro-alerting-queries" const val NO_ID = "" diff --git a/core/src/main/resources/mappings/doc-level-queries.json b/core/src/main/resources/mappings/doc-level-queries.json new file mode 100644 index 000000000..6be0b0e4a --- /dev/null +++ b/core/src/main/resources/mappings/doc-level-queries.json @@ -0,0 +1,13 @@ +{ + "_meta": { + "schema_version": 5 + }, + "properties": { + "query": { + "type": "percolator" + }, + "monitor_id": { + "type": "text" + } + } +} \ No newline at end of file