From 452a51c66ee5d4778ef332f1b17ef2f0109c15bc Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Wed, 25 Sep 2024 03:14:22 +0000 Subject: [PATCH 1/4] separate doc-level monitor query indices for externally defined monitors Signed-off-by: Subhobrata Dey --- .../alerting/DocumentLevelMonitorRunner.kt | 15 +++ .../alerting/service/DeleteMonitorService.kt | 96 +++++++++++-------- .../transport/TransportIndexMonitorAction.kt | 24 +++-- .../alerting/util/DocLevelMonitorQueries.kt | 11 +++ .../alerting/MonitorDataSourcesIT.kt | 7 +- .../bwc/AlertingBackwardsCompatibilityIT.kt | 1 + 6 files changed, 103 insertions(+), 51 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index f28b03292..c79c8b8c2 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -374,6 +374,13 @@ class DocumentLevelMonitorRunner : MonitorRunner() { // Clean up any queries created by the dry run monitor monitorCtx.docLevelMonitorQueries!!.deleteDocLevelQueriesOnDryRun(monitorMetadata) } + + if (monitor.dataSources.queryIndex.contains("optimized")) { + val ack = monitorCtx.docLevelMonitorQueries!!.deleteDocLevelQueryIndex(monitor.dataSources) + if (!ack) { + logger.error("Deletion of concrete queryIndex:${monitor.dataSources.queryIndex} is not ack'd!") + } + } // TODO: Update the Document as part of the Trigger and return back the trigger action result return monitorResult.copy(triggerResults = triggerResults, inputResults = inputRunResults) } catch (e: Exception) { @@ -385,6 +392,14 @@ class DocumentLevelMonitorRunner : MonitorRunner() { RestStatus.INTERNAL_SERVER_ERROR, e ) + if (monitor.dataSources.queryIndex.contains("optimized") && + monitorCtx.docLevelMonitorQueries!!.docLevelQueryIndexExists(monitor.dataSources) + ) { + val ack = monitorCtx.docLevelMonitorQueries!!.deleteDocLevelQueryIndex(monitor.dataSources) + if (!ack) { + logger.error("Retry deletion of concrete queryIndex:${monitor.dataSources.queryIndex} is not ack'd!") + } + } return monitorResult.copy(error = alertingException, inputResults = InputRunResults(emptyList(), alertingException)) } finally { val endTime = System.currentTimeMillis() diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/service/DeleteMonitorService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/service/DeleteMonitorService.kt index 8c6eb4b69..0505a2dff 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/service/DeleteMonitorService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/service/DeleteMonitorService.kt @@ -95,57 +95,69 @@ object DeleteMonitorService : private suspend fun deleteDocLevelMonitorQueriesAndIndices(monitor: Monitor) { try { - val metadata = MonitorMetadataService.getMetadata(monitor) - metadata?.sourceToQueryIndexMapping?.forEach { (_, queryIndex) -> + if (monitor.owner == "alerting") { + val metadata = MonitorMetadataService.getMetadata(monitor) + metadata?.sourceToQueryIndexMapping?.forEach { (_, queryIndex) -> - val indicesExistsResponse: IndicesExistsResponse = - client.suspendUntil { - client.admin().indices().exists(IndicesExistsRequest(queryIndex), it) + val indicesExistsResponse: IndicesExistsResponse = + client.suspendUntil { + client.admin().indices().exists(IndicesExistsRequest(queryIndex), it) + } + if (indicesExistsResponse.isExists == false) { + return } - if (indicesExistsResponse.isExists == false) { - return - } - // Check if there's any queries from other monitors in this queryIndex, - // to avoid unnecessary doc deletion, if we could just delete index completely - val searchResponse: SearchResponse = client.suspendUntil { - search( - SearchRequest(queryIndex).source( - SearchSourceBuilder() - .size(0) - .query( - QueryBuilders.boolQuery().mustNot( - QueryBuilders.matchQuery("monitor_id", monitor.id) + // Check if there's any queries from other monitors in this queryIndex, + // to avoid unnecessary doc deletion, if we could just delete index completely + val searchResponse: SearchResponse = client.suspendUntil { + search( + SearchRequest(queryIndex).source( + SearchSourceBuilder() + .size(0) + .query( + QueryBuilders.boolQuery().mustNot( + QueryBuilders.matchQuery("monitor_id", monitor.id) + ) ) - ) - ).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), - it - ) - } - if (searchResponse.hits.totalHits.value == 0L) { - val ack: AcknowledgedResponse = client.suspendUntil { - client.admin().indices().delete( - DeleteIndexRequest(queryIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), + ).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), it ) } - if (ack.isAcknowledged == false) { - log.error("Deletion of concrete queryIndex:$queryIndex is not ack'd!") - } - } else { - // Delete all queries added by this monitor - val response: BulkByScrollResponse = suspendCoroutine { cont -> - DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) - .source(queryIndex) - .filter(QueryBuilders.matchQuery("monitor_id", monitor.id)) - .refresh(true) - .execute( - object : ActionListener { - override fun onResponse(response: BulkByScrollResponse) = cont.resume(response) - override fun onFailure(t: Exception) = cont.resumeWithException(t) - } + if (searchResponse.hits.totalHits.value == 0L) { + val ack: AcknowledgedResponse = client.suspendUntil { + client.admin().indices().delete( + DeleteIndexRequest(queryIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), + it ) + } + if (ack.isAcknowledged == false) { + log.error("Deletion of concrete queryIndex:$queryIndex is not ack'd!") + } + } else { + // Delete all queries added by this monitor + val response: BulkByScrollResponse = suspendCoroutine { cont -> + DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) + .source(queryIndex) + .filter(QueryBuilders.matchQuery("monitor_id", monitor.id)) + .refresh(true) + .execute( + object : ActionListener { + override fun onResponse(response: BulkByScrollResponse) = cont.resume(response) + override fun onFailure(t: Exception) = cont.resumeWithException(t) + } + ) + } } } + } else { + val ack: AcknowledgedResponse = client.suspendUntil { + client.admin().indices().delete( + DeleteIndexRequest(monitor.dataSources.queryIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), + it + ) + } + if (ack.isAcknowledged == false) { + log.error("Deletion of concrete queryIndex:${monitor.dataSources.queryIndex} is not ack'd!") + } } } catch (e: Exception) { // we only log the error and don't fail the request because if monitor document has been deleted successfully, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt index 36e1d83f4..1447a9b9b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt @@ -537,7 +537,8 @@ class TransportIndexMonitorAction @Inject constructor( if ( request.monitor.isMonitorOfStandardType() && Monitor.MonitorType.valueOf(request.monitor.monitorType.uppercase(Locale.ROOT)) == - Monitor.MonitorType.DOC_LEVEL_MONITOR + Monitor.MonitorType.DOC_LEVEL_MONITOR && + request.monitor.owner == "alerting" ) { indexDocLevelMonitorQueries(request.monitor, indexResponse.id, metadata, request.refreshPolicy) } @@ -702,13 +703,22 @@ class TransportIndexMonitorAction @Inject constructor( Monitor.MonitorType.valueOf(currentMonitor.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR ) { updatedMetadata = MonitorMetadataService.recreateRunContext(metadata, currentMonitor) - client.suspendUntil { - DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) - .source(currentMonitor.dataSources.queryIndex) - .filter(QueryBuilders.matchQuery("monitor_id", currentMonitor.id)) - .execute(it) + if (docLevelMonitorQueries.docLevelQueryIndexExists(currentMonitor.dataSources)) { + client.suspendUntil { + DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) + .source(currentMonitor.dataSources.queryIndex) + .filter(QueryBuilders.matchQuery("monitor_id", currentMonitor.id)) + .execute(it) + } + } + if (currentMonitor.owner == "alerting") { + indexDocLevelMonitorQueries( + request.monitor, + currentMonitor.id, + updatedMetadata, + request.refreshPolicy + ) } - indexDocLevelMonitorQueries(request.monitor, currentMonitor.id, updatedMetadata, request.refreshPolicy) MonitorMetadataService.upsertMetadata(updatedMetadata, updating = true) } actionListener.onResponse( diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt index 60daaa4cc..de84e5051 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt @@ -24,6 +24,7 @@ import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest import org.opensearch.action.bulk.BulkRequest import org.opensearch.action.bulk.BulkResponse import org.opensearch.action.index.IndexRequest +import org.opensearch.action.support.IndicesOptions import org.opensearch.action.support.WriteRequest.RefreshPolicy import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.alerting.MonitorRunnerService.monitorCtx @@ -181,6 +182,16 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ } } + suspend fun deleteDocLevelQueryIndex(dataSources: DataSources): Boolean { + val ack: AcknowledgedResponse = client.suspendUntil { + client.admin().indices().delete( + DeleteIndexRequest(dataSources.queryIndex).indicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_HIDDEN), + it + ) + } + return ack.isAcknowledged + } + fun docLevelQueryIndexExists(dataSources: DataSources): Boolean { val clusterState = clusterService.state() return clusterState.metadata.hasAlias(dataSources.queryIndex) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index 7d16cbabe..a4e5ad08f 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -1195,7 +1195,8 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { dataSources = DataSources( queryIndex = customQueryIndex, queryIndexMappingsByType = mapOf(Pair("text", mapOf(Pair("analyzer", analyzer)))), - ) + ), + owner = "alerting" ) try { createMonitor(monitor) @@ -2379,7 +2380,9 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) var monitor = randomDocumentLevelMonitor( inputs = listOf(docLevelInput), - triggers = listOf(trigger) + triggers = listOf(trigger), + dataSources = DataSources(), + owner = "alerting" ) // This doc should create close to 10000 (limit) fields in index mapping. It's easier to add mappings like this then via api val docPayload: StringBuilder = StringBuilder(100000) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/bwc/AlertingBackwardsCompatibilityIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/bwc/AlertingBackwardsCompatibilityIT.kt index 280074964..c2f6b5fd0 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/bwc/AlertingBackwardsCompatibilityIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/bwc/AlertingBackwardsCompatibilityIT.kt @@ -166,6 +166,7 @@ class AlertingBackwardsCompatibilityIT : AlertingRestTestCase() { val indexName = "test_bwc_index" val bwcMonitorString = """ { + "owner": "alerting", "type": "monitor", "name": "test_bwc_monitor", "enabled": true, From b280761831bc8accac799a316202a7e49899456e Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Wed, 25 Sep 2024 10:05:52 +0000 Subject: [PATCH 2/4] separate doc-level monitor query indices for externally defined monitors Signed-off-by: Subhobrata Dey --- .../alerting/DocumentLevelMonitorRunner.kt | 14 ++++---------- .../alerting/service/DeleteMonitorService.kt | 2 +- .../transport/TransportIndexMonitorAction.kt | 4 ++-- .../alerting/util/DocLevelMonitorQueries.kt | 1 + .../alerting/SampleRemoteMonitorRestHandler.java | 3 +++ 5 files changed, 11 insertions(+), 13 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index c79c8b8c2..90375b0e4 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -375,12 +375,6 @@ class DocumentLevelMonitorRunner : MonitorRunner() { monitorCtx.docLevelMonitorQueries!!.deleteDocLevelQueriesOnDryRun(monitorMetadata) } - if (monitor.dataSources.queryIndex.contains("optimized")) { - val ack = monitorCtx.docLevelMonitorQueries!!.deleteDocLevelQueryIndex(monitor.dataSources) - if (!ack) { - logger.error("Deletion of concrete queryIndex:${monitor.dataSources.queryIndex} is not ack'd!") - } - } // TODO: Update the Document as part of the Trigger and return back the trigger action result return monitorResult.copy(triggerResults = triggerResults, inputResults = inputRunResults) } catch (e: Exception) { @@ -392,16 +386,16 @@ class DocumentLevelMonitorRunner : MonitorRunner() { RestStatus.INTERNAL_SERVER_ERROR, e ) - if (monitor.dataSources.queryIndex.contains("optimized") && + return monitorResult.copy(error = alertingException, inputResults = InputRunResults(emptyList(), alertingException)) + } finally { + if (monitor.deleteQueryIndexInEveryRun == true && monitorCtx.docLevelMonitorQueries!!.docLevelQueryIndexExists(monitor.dataSources) ) { val ack = monitorCtx.docLevelMonitorQueries!!.deleteDocLevelQueryIndex(monitor.dataSources) if (!ack) { - logger.error("Retry deletion of concrete queryIndex:${monitor.dataSources.queryIndex} is not ack'd!") + logger.error("Deletion of concrete queryIndex:${monitor.dataSources.queryIndex} is not ack'd! for monitor ${monitor.id}") } } - return monitorResult.copy(error = alertingException, inputResults = InputRunResults(emptyList(), alertingException)) - } finally { val endTime = System.currentTimeMillis() totalTimeTakenStat = endTime - startTime logger.debug( diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/service/DeleteMonitorService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/service/DeleteMonitorService.kt index 0505a2dff..a0bccec0a 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/service/DeleteMonitorService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/service/DeleteMonitorService.kt @@ -95,7 +95,7 @@ object DeleteMonitorService : private suspend fun deleteDocLevelMonitorQueriesAndIndices(monitor: Monitor) { try { - if (monitor.owner == "alerting") { + if (monitor.deleteQueryIndexInEveryRun == false) { val metadata = MonitorMetadataService.getMetadata(monitor) metadata?.sourceToQueryIndexMapping?.forEach { (_, queryIndex) -> diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt index 1447a9b9b..aad64f1e3 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt @@ -538,7 +538,7 @@ class TransportIndexMonitorAction @Inject constructor( request.monitor.isMonitorOfStandardType() && Monitor.MonitorType.valueOf(request.monitor.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR && - request.monitor.owner == "alerting" + request.monitor.deleteQueryIndexInEveryRun == false ) { indexDocLevelMonitorQueries(request.monitor, indexResponse.id, metadata, request.refreshPolicy) } @@ -711,7 +711,7 @@ class TransportIndexMonitorAction @Inject constructor( .execute(it) } } - if (currentMonitor.owner == "alerting") { + if (currentMonitor.deleteQueryIndexInEveryRun == false) { indexDocLevelMonitorQueries( request.monitor, currentMonitor.id, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt index de84e5051..aaa17a56a 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt @@ -445,6 +445,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ ) ) indexRequests.add(indexRequest) + log.debug("query $query added for execution of monitor $monitorId on index $sourceIndex") } log.debug("bulk inserting percolate [${queries.size}] queries") if (indexRequests.isNotEmpty()) { diff --git a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java index 0340baa6b..a8f384cb6 100644 --- a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java +++ b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java @@ -94,6 +94,7 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient "id", null)), trigger1Serialized)), Map.of(), new DataSources(), + true, "sample-remote-monitor-plugin" ); IndexMonitorRequest indexMonitorRequest1 = new IndexMonitorRequest( @@ -154,6 +155,7 @@ public void onFailure(Exception e) { List.of(), Map.of(), new DataSources(), + true, "sample-remote-monitor-plugin" ); IndexMonitorRequest indexMonitorRequest2 = new IndexMonitorRequest( @@ -237,6 +239,7 @@ public void onFailure(Exception e) { "id", null)), trigger1Serialized)), Map.of(), new DataSources(), + true, "sample-remote-monitor-plugin" ); IndexMonitorRequest indexDocLevelMonitorRequest = new IndexMonitorRequest( From a15d5a15a5cca54102cb9c094d91c491038e207f Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Wed, 25 Sep 2024 21:08:41 +0000 Subject: [PATCH 3/4] update source to index mappings for detector upgrade scenarios Signed-off-by: Subhobrata Dey --- .../org/opensearch/alerting/util/DocLevelMonitorQueries.kt | 7 ++++++- core/src/main/resources/mappings/scheduled-jobs.json | 3 +++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt index aaa17a56a..d2be2c8c0 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt @@ -490,7 +490,12 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ updatedProperties: MutableMap ): Pair { var targetQueryIndex = monitorMetadata.sourceToQueryIndexMapping[sourceIndex + monitor.id] - if (targetQueryIndex == null) { + if ( + targetQueryIndex == null || ( + targetQueryIndex != monitor.dataSources.queryIndex && + monitor.deleteQueryIndexInEveryRun == true + ) + ) { // queryIndex is alias which will always have only 1 backing index which is writeIndex // This is due to a fact that that _rollover API would maintain only single index under alias // if you don't add is_write_index setting when creating index initially diff --git a/core/src/main/resources/mappings/scheduled-jobs.json b/core/src/main/resources/mappings/scheduled-jobs.json index 2651c862e..311cc6d84 100644 --- a/core/src/main/resources/mappings/scheduled-jobs.json +++ b/core/src/main/resources/mappings/scheduled-jobs.json @@ -293,6 +293,9 @@ } } }, + "delete_query_index_in_every_run": { + "type": "boolean" + }, "ui_metadata": { "type": "object", "enabled": false From 90620a08bf143808f8360fff2ae296b506421af5 Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Wed, 25 Sep 2024 21:16:39 +0000 Subject: [PATCH 4/4] fix ktlint Signed-off-by: Subhobrata Dey --- .../org/opensearch/alerting/DocumentLevelMonitorRunner.kt | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 90375b0e4..0164aa00e 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -393,7 +393,10 @@ class DocumentLevelMonitorRunner : MonitorRunner() { ) { val ack = monitorCtx.docLevelMonitorQueries!!.deleteDocLevelQueryIndex(monitor.dataSources) if (!ack) { - logger.error("Deletion of concrete queryIndex:${monitor.dataSources.queryIndex} is not ack'd! for monitor ${monitor.id}") + logger.error( + "Deletion of concrete queryIndex:${monitor.dataSources.queryIndex} is not ack'd! " + + "for monitor ${monitor.id}" + ) } } val endTime = System.currentTimeMillis()