diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 780971f7e..a1ffc44e4 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -25,8 +25,6 @@ import org.opensearch.alerting.model.MonitorRunResult import org.opensearch.alerting.model.userErrorMessage import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext -import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT -import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY import org.opensearch.alerting.util.AlertingException import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.defaultToPerExecutionAction @@ -237,25 +235,25 @@ class DocumentLevelMonitorRunner : MonitorRunner() { } val fieldsToBeQueried = mutableSetOf() - - for (it in queries) { - if (it.queryFieldNames.isEmpty()) { - fieldsToBeQueried.clear() + if (monitorCtx.fetchOnlyQueryFieldNames) { + for (it in queries) { + if (it.queryFieldNames.isEmpty()) { + fieldsToBeQueried.clear() + logger.debug( + "Monitor ${monitor.id} : " + + "Doc Level query ${it.id} : ${it.query} doesn't have queryFieldNames populated. " + + "Cannot optimize monitor to fetch only query-relevant fields. " + + "Querying entire doc source." + ) + break + } + fieldsToBeQueried.addAll(it.queryFieldNames) + } + if (fieldsToBeQueried.isNotEmpty()) logger.debug( - "Monitor ${monitor.id} : " + - "Doc Level query ${it.id} : ${it.query} doesn't have queryFieldNames populated. " + - "Cannot optimize monitor to fetch only query-relevant fields. " + - "Querying entire doc source." + "Monitor ${monitor.id} Querying only fields " + + "${fieldsToBeQueried.joinToString()} instead of entire _source of documents" ) - break - } - fieldsToBeQueried.addAll(it.queryFieldNames) - } - if (monitorCtx.fetchOnlyQueryFieldNames && fieldsToBeQueried.isNotEmpty()) { - logger.debug( - "Monitor ${monitor.id} Querying only fields " + - "${fieldsToBeQueried.joinToString()} instead of entire _source of documents" - ) } // Prepare DocumentExecutionContext for each index @@ -1028,7 +1026,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() { * */ private fun isInMemoryDocsSizeExceedingMemoryLimit(docsBytesSize: Long, monitorCtx: MonitorRunnerExecutionContext): Boolean { - var thresholdPercentage = PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT.get(monitorCtx.settings) + var thresholdPercentage = monitorCtx.percQueryDocsSizeMemoryPercentageLimit val heapMaxBytes = monitorCtx.jvmStats!!.mem.heapMax.bytes val thresholdBytes = (thresholdPercentage.toDouble() / 100.0) * heapMaxBytes @@ -1036,7 +1034,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() { } private fun isInMemoryNumDocsExceedingMaxDocsPerPercolateQueryLimit(numDocs: Int, monitorCtx: MonitorRunnerExecutionContext): Boolean { - var maxNumDocsThreshold = PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY.get(monitorCtx.settings) + var maxNumDocsThreshold = monitorCtx.percQueryMaxNumDocsInMemory return numDocs >= maxNumDocsThreshold } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt index 01010a424..043ae88d4 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt @@ -52,4 +52,7 @@ data class MonitorRunnerExecutionContext( @Volatile var indexTimeout: TimeValue? = null, @Volatile var findingsIndexBatchSize: Int = AlertingSettings.DEFAULT_FINDINGS_INDEXING_BATCH_SIZE, @Volatile var fetchOnlyQueryFieldNames: Boolean = true, + @Volatile var percQueryMaxNumDocsInMemory: Int = AlertingSettings.DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY, + @Volatile var percQueryDocsSizeMemoryPercentageLimit: Int = + AlertingSettings.DEFAULT_PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT, ) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt index 3af0e49a1..9cd3c2401 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt @@ -31,6 +31,8 @@ import org.opensearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT import org.opensearch.alerting.settings.AlertingSettings.Companion.MAX_ACTIONABLE_ALERT_COUNT import org.opensearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_COUNT import org.opensearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_MILLIS +import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT +import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY import org.opensearch.alerting.settings.DestinationSettings.Companion.ALLOW_LIST import org.opensearch.alerting.settings.DestinationSettings.Companion.HOST_DENY_LIST import org.opensearch.alerting.settings.DestinationSettings.Companion.loadDestinationSettings @@ -188,6 +190,18 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon monitorCtx.fetchOnlyQueryFieldNames = it } + monitorCtx.percQueryMaxNumDocsInMemory = PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY.get(monitorCtx.settings) + monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY) { + monitorCtx.percQueryMaxNumDocsInMemory = it + } + + monitorCtx.percQueryDocsSizeMemoryPercentageLimit = + PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT.get(monitorCtx.settings) + monitorCtx.clusterService!!.clusterSettings + .addSettingsUpdateConsumer(PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT) { + monitorCtx.percQueryDocsSizeMemoryPercentageLimit = it + } + return this } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt index de5b3bc81..19059dc1d 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt @@ -18,6 +18,8 @@ class AlertingSettings { companion object { const val DEFAULT_MAX_ACTIONABLE_ALERT_COUNT = 50L const val DEFAULT_FINDINGS_INDEXING_BATCH_SIZE = 1000 + const val DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY = 50000 + const val DEFAULT_PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT = 10 val ALERTING_MAX_MONITORS = Setting.intSetting( "plugins.alerting.monitor.max_monitors", @@ -44,7 +46,7 @@ class AlertingSettings { */ val PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY = Setting.intSetting( "plugins.alerting.monitor.percolate_query_max_num_docs_in_memory", - 50000, 1000, + DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY, 1000, Setting.Property.NodeScope, Setting.Property.Dynamic ) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index 2928be59a..9406addf5 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -219,7 +219,7 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { } val alerts = searchAlerts(monitor) - assertEquals("Alert saved for test monitor", 1, alerts.size) + assertEquals("Alert saved for test monitor", 0, alerts.size) } fun `test execute monitor returns search result with dryrun`() {