diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt index a5522b535..13f6e8147 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteMonitorAction.kt @@ -14,6 +14,8 @@ import org.opensearch.OpenSearchStatusException import org.opensearch.action.ActionListener import org.opensearch.action.ActionRequest import org.opensearch.action.admin.indices.delete.DeleteIndexRequest +import org.opensearch.action.admin.indices.exists.indices.IndicesExistsRequest +import org.opensearch.action.admin.indices.exists.indices.IndicesExistsResponse import org.opensearch.action.delete.DeleteRequest import org.opensearch.action.delete.DeleteResponse import org.opensearch.action.get.GetRequest @@ -22,6 +24,7 @@ import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction +import org.opensearch.action.support.IndicesOptions import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.alerting.MonitorMetadataService import org.opensearch.alerting.opensearchapi.suspendUntil @@ -152,33 +155,51 @@ class TransportDeleteMonitorAction @Inject constructor( val metadata = MonitorMetadataService.getMetadata(monitor) metadata?.sourceToQueryIndexMapping?.forEach { (_, queryIndex) -> - if (!clusterState.routingTable.hasIndex(queryIndex)) { + val indicesExistsResponse: IndicesExistsResponse = + client.suspendUntil { + client.admin().indices().exists(IndicesExistsRequest(queryIndex), it) + } + if (indicesExistsResponse.isExists == false) { return } - // Delete all queries added by this monitor - val response: BulkByScrollResponse = suspendCoroutine { cont -> - DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) - .source(queryIndex) - .filter(QueryBuilders.matchQuery("monitor_id", monitorId)) - .refresh(true) - .execute( - object : ActionListener { - override fun onResponse(response: BulkByScrollResponse) = cont.resume(response) - override fun onFailure(t: Exception) = cont.resumeWithException(t) - } - ) - } - - // Get document count on that index to check if we should delete it + // Check if there's any queries from other monitors in this queryIndex, + // to avoid unnecessary doc deletion, if we could just delete index completely val searchResponse: SearchResponse = client.suspendUntil { - search(SearchRequest(queryIndex).source(SearchSourceBuilder().size(0)), it) + search( + SearchRequest(queryIndex).source( + SearchSourceBuilder() + .size(0) + .query( + QueryBuilders.boolQuery().mustNot( + QueryBuilders.matchQuery("monitor_id", monitorId) + ) + ) + ).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), + it + ) } if (searchResponse.hits.totalHits.value == 0L) { val ack: AcknowledgedResponse = client.suspendUntil { - client.admin().indices().delete(DeleteIndexRequest(queryIndex), it) + client.admin().indices().delete( + DeleteIndexRequest(queryIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), it + ) } if (ack.isAcknowledged == false) { - log.error("Failed to delete concrete queryIndex:$queryIndex during monitor deletion") + log.error("Deletion of concrete queryIndex:$queryIndex is not ack'd!") + } + } else { + // Delete all queries added by this monitor + val response: BulkByScrollResponse = suspendCoroutine { cont -> + DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) + .source(queryIndex) + .filter(QueryBuilders.matchQuery("monitor_id", monitorId)) + .refresh(true) + .execute( + object : ActionListener { + override fun onResponse(response: BulkByScrollResponse) = cont.resume(response) + override fun onFailure(t: Exception) = cont.resumeWithException(t) + } + ) } } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt index c8f102968..5d20b51e4 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt @@ -498,6 +498,8 @@ class TransportIndexMonitorAction @Inject constructor( if (request.monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { indexDocLevelMonitorQueries(request.monitor, indexResponse.id, metadata, request.refreshPolicy) } + // When inserting queries in queryIndex we could update sourceToQueryIndexMapping + MonitorMetadataService.upsertMetadata(metadata, updating = true) actionListener.onResponse( IndexMonitorResponse( @@ -625,7 +627,6 @@ class TransportIndexMonitorAction @Inject constructor( // Delete and insert all queries from/to queryIndex if (created == false && currentMonitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { updatedMetadata = MonitorMetadataService.recreateRunContext(metadata, currentMonitor) - updatedMetadata = MonitorMetadataService.upsertMetadata(updatedMetadata, updating = true) client.suspendUntil { DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) .source(currentMonitor.dataSources.queryIndex) @@ -633,6 +634,7 @@ class TransportIndexMonitorAction @Inject constructor( .execute(it) } indexDocLevelMonitorQueries(request.monitor, currentMonitor.id, updatedMetadata, request.refreshPolicy) + MonitorMetadataService.upsertMetadata(updatedMetadata, updating = true) } actionListener.onResponse( IndexMonitorResponse( diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index 2df99010e..0aa4ccfac 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -337,9 +337,8 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { SearchRequest(customQueryIndex).source(SearchSourceBuilder().query(QueryBuilders.matchAllQuery())) ).get() assertNotEquals(0, searchResponse.hits.hits.size) - client().execute( - AlertingActions.DELETE_MONITOR_ACTION_TYPE, DeleteMonitorRequest(monitorId, WriteRequest.RefreshPolicy.IMMEDIATE) - ).get() + + deleteMonitor(monitorId) assertIndexNotExists(customQueryIndex + "*") assertAliasNotExists(customQueryIndex) } @@ -1000,8 +999,8 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { inputs = listOf(docLevelInput), triggers = listOf(trigger) ) - // This doc should create close to 1000 (limit) fields in index mapping. It's easier to add mappings like this then via api - val docPayload: StringBuilder = StringBuilder(100000) + // Create doc with 11 fields + val docPayload: StringBuilder = StringBuilder(1000) docPayload.append("{") for (i in 1..10) { docPayload.append(""" "id$i":$i,""") @@ -1060,6 +1059,62 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { Assert.assertTrue(alerts.size == 2) } + /** + * 1. Create monitor with input source_index with 900 fields in mappings - can fit 1 in queryIndex + * 2. Update monitor and change input source_index to a new one with 900 fields in mappings + * 3. Expect queryIndex rollover resulting in 2 backing indices + * 4. Delete monitor and expect that all backing indices are deleted + * */ + fun `test updating monitor no execution queryIndex rolling over`() { + val testSourceIndex1 = "test_source_index1" + val testSourceIndex2 = "test_source_index2" + createIndex(testSourceIndex1, Settings.EMPTY) + createIndex(testSourceIndex2, Settings.EMPTY) + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(testSourceIndex1), listOf(docQuery)) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger) + ) + // This doc should create close to 1000 (limit) fields in index mapping. It's easier to add mappings like this then via api + val docPayload: StringBuilder = StringBuilder(100000) + docPayload.append("{") + for (i in 1..899) { + docPayload.append(""" "id$i":$i,""") + } + docPayload.append("\"test_field\" : \"us-west-2\" }") + // Indexing docs here as an easier means to set index mappings + indexDoc(testSourceIndex1, "1", docPayload.toString()) + indexDoc(testSourceIndex2, "1", docPayload.toString()) + // Create monitor + var monitorResponse = createMonitor(monitor) + assertFalse(monitorResponse?.id.isNullOrEmpty()) + monitor = monitorResponse!!.monitor + + // Update monitor and change input + val updatedMonitor = monitor.copy( + inputs = listOf( + DocLevelMonitorInput("description", listOf(testSourceIndex2), listOf(docQuery)) + ) + ) + updateMonitor(updatedMonitor, updatedMonitor.id) + assertFalse(monitorResponse?.id.isNullOrEmpty()) + + // Expect queryIndex to rollover after setting new source_index with close to limit amount of fields in mappings + var getIndexResponse: GetIndexResponse = + client().admin().indices().getIndex(GetIndexRequest().indices(ScheduledJob.DOC_LEVEL_QUERIES_INDEX + "*")).get() + assertEquals(2, getIndexResponse.indices.size) + + deleteMonitor(updatedMonitor.id) + waitUntil { + getIndexResponse = + client().admin().indices().getIndex(GetIndexRequest().indices(ScheduledJob.DOC_LEVEL_QUERIES_INDEX + "*")).get() + return@waitUntil getIndexResponse.indices.isEmpty() + } + assertEquals(0, getIndexResponse.indices.size) + } + fun `test queryIndex bwc when index was not an alias`() { createIndex(DOC_LEVEL_QUERIES_INDEX, Settings.builder().put("index.hidden", true).build()) assertIndexExists(DOC_LEVEL_QUERIES_INDEX) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/transport/AlertingSingleNodeTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/transport/AlertingSingleNodeTestCase.kt index 47d7805de..de8dc8319 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/transport/AlertingSingleNodeTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/transport/AlertingSingleNodeTestCase.kt @@ -26,6 +26,7 @@ import org.opensearch.common.unit.TimeValue import org.opensearch.common.xcontent.XContentType import org.opensearch.common.xcontent.json.JsonXContent import org.opensearch.commons.alerting.action.AlertingActions +import org.opensearch.commons.alerting.action.DeleteMonitorRequest import org.opensearch.commons.alerting.action.GetFindingsRequest import org.opensearch.commons.alerting.action.GetFindingsResponse import org.opensearch.commons.alerting.action.IndexMonitorRequest @@ -154,6 +155,13 @@ abstract class AlertingSingleNodeTestCase : OpenSearchSingleNodeTestCase() { return client().execute(AlertingActions.INDEX_MONITOR_ACTION_TYPE, request).actionGet() } + protected fun deleteMonitor(monitorId: String): Boolean { + client().execute( + AlertingActions.DELETE_MONITOR_ACTION_TYPE, DeleteMonitorRequest(monitorId, WriteRequest.RefreshPolicy.IMMEDIATE) + ).get() + return true + } + protected fun searchAlerts(id: String, indices: String = AlertIndices.ALERT_INDEX, refresh: Boolean = true): List { try { if (refresh) refreshIndex(indices)