Skip to content

Commit

Permalink
add setting for percolate query docs size memory threshold to perform…
Browse files Browse the repository at this point in the history
… percolate query

Signed-off-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
eirsep committed Dec 27, 2023
1 parent 6c00e73 commit 447d76f
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ import org.opensearch.core.xcontent.XContentParser
import org.opensearch.env.Environment
import org.opensearch.env.NodeEnvironment
import org.opensearch.index.IndexModule
import org.opensearch.monitor.jvm.JvmStats
import org.opensearch.painless.spi.Allowlist
import org.opensearch.painless.spi.AllowlistLoader
import org.opensearch.painless.spi.PainlessExtension
Expand Down Expand Up @@ -257,6 +258,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
.registerTriggerService(TriggerService(scriptService))
.registerAlertService(AlertService(client, xContentRegistry, alertIndices))
.registerDocLevelMonitorQueries(DocLevelMonitorQueries(client, clusterService))
.registerJvmStats(JvmStats.jvmStats())
.registerWorkflowService(WorkflowService(client, xContentRegistry))
.registerConsumers()
.registerDestinationSettings()
Expand Down Expand Up @@ -314,6 +316,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
AlertingSettings.ALERT_HISTORY_MAX_DOCS,
AlertingSettings.ALERT_HISTORY_RETENTION_PERIOD,
AlertingSettings.ALERTING_MAX_MONITORS,
AlertingSettings.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT,
AlertingSettings.REQUEST_TIMEOUT,
AlertingSettings.MAX_ACTION_THROTTLE_VALUE,
AlertingSettings.FILTER_BY_BACKEND_ROLES,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.model.userErrorMessage
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.defaultToPerExecutionAction
Expand Down Expand Up @@ -56,6 +57,7 @@ import org.opensearch.index.IndexNotFoundException
import org.opensearch.index.query.BoolQueryBuilder
import org.opensearch.index.query.Operator
import org.opensearch.index.query.QueryBuilders
import org.opensearch.monitor.jvm.JvmStats
import org.opensearch.percolator.PercolateQueryBuilderExt
import org.opensearch.search.SearchHit
import org.opensearch.search.SearchHits
Expand All @@ -64,6 +66,7 @@ import org.opensearch.search.sort.SortOrder
import java.io.IOException
import java.time.Instant
import java.util.UUID
import java.util.concurrent.atomic.AtomicLong
import kotlin.math.max

object DocumentLevelMonitorRunner : MonitorRunner() {
Expand Down Expand Up @@ -150,7 +153,14 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
// Map of document ids per index when monitor is workflow delegate and has chained findings
val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex

/* Contains list of docs source that in memory to submit to percolate query against query index.
* Docs are fetched from the source index per shard and transformed.*/
val transformedDocs = mutableListOf<Pair<String, BytesReference>>()
val docsSizeInBytes = AtomicLong(0)
var lastUpdatedIndexName: String? = null
var lastConcreteIndexName: String? = null
docLevelMonitorInput.indices.forEach { indexName ->

var concreteIndices = IndexUtils.resolveAllIndices(
listOf(indexName),
monitorCtx.clusterService!!,
Expand All @@ -172,13 +182,15 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}
}
val updatedIndexName = indexName.replace("*", "_")
lastUpdatedIndexName = updatedIndexName
val conflictingFields = monitorCtx.docLevelMonitorQueries!!.getAllConflictingFields(
monitorCtx.clusterService!!.state(),
concreteIndices
)

concreteIndices.forEach { concreteIndexName ->
// Prepare lastRunContext for each index
lastConcreteIndexName = concreteIndexName
val indexLastRunContext = lastRunContext.getOrPut(concreteIndexName) {
val isIndexCreatedRecently = createdRecently(
monitor,
Expand Down Expand Up @@ -230,7 +242,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
matchingDocIdsPerIndex?.get(concreteIndexName),
monitorMetadata,
inputRunResults,
docsToQueries
docsToQueries,
transformedDocs,
docsSizeInBytes
)
}
}
Expand Down Expand Up @@ -564,6 +578,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
.size(1)
)
val response: SearchResponse = client.suspendUntil { client.search(request, it) }
JvmStats.jvmStats()
if (response.status() !== RestStatus.OK) {
throw IOException("Failed to get max seq no for shard: $shard")
}
Expand Down Expand Up @@ -594,10 +609,11 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitorMetadata: MonitorMetadata,
inputRunResults: MutableMap<String, MutableSet<String>>,
docsToQueries: MutableMap<String, MutableList<String>>,
transformedDocs: MutableList<Pair<String, BytesReference>>,
docsSizeInBytes: AtomicLong,
) {
val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int
for (i: Int in 0 until count) {
val transformedDocs = mutableListOf<Pair<String, BytesReference>>()
val shard = i.toString()
try {
val maxSeqNo: Long = docExecutionCtx.updatedLastRunContext[shard].toString().toLong()
Expand All @@ -618,7 +634,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
indexName,
concreteIndexName,
monitor.id,
conflictingFields
conflictingFields,
docsSizeInBytes
)
)
} catch (e: Exception) {
Expand All @@ -628,30 +645,73 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
e
)
}

if (transformedDocs.isNotEmpty()) {
val percolateQueryResponseHits = runPercolateQueryOnTransformedDocs(
if (transformedDocs.isNotEmpty() || isInMemoryDocsSizeExceedingMemoryLimit(docsSizeInBytes.get(), monitorCtx)) {
performPercolateQueryAndResetCounters(
monitorCtx,
transformedDocs.map { it.second },
transformedDocs,
docsSizeInBytes,
monitor,
monitorMetadata,
indexName,
concreteIndexName
concreteIndexName,
inputRunResults,
docsToQueries
)
}
}
/* if all shards are covered still in-memory docs size limit is not breached we would need to submit
the percolate query at the end*/
if (transformedDocs.isNotEmpty()) {
performPercolateQueryAndResetCounters(
monitorCtx,
transformedDocs,
docsSizeInBytes,
monitor,
monitorMetadata,
indexName,
concreteIndexName,
inputRunResults,
docsToQueries
)
}
}

private suspend fun performPercolateQueryAndResetCounters(
monitorCtx: MonitorRunnerExecutionContext,
transformedDocs: MutableList<Pair<String, BytesReference>>,
docsSizeInBytes: AtomicLong,
monitor: Monitor,
monitorMetadata: MonitorMetadata,
indexName: String,
concreteIndexName: String,
inputRunResults: MutableMap<String, MutableSet<String>>,
docsToQueries: MutableMap<String, MutableList<String>>,
) {
try {
val percolateQueryResponseHits = runPercolateQueryOnTransformedDocs(
monitorCtx,
transformedDocs.map { it.second },
monitor,
monitorMetadata,
indexName,
concreteIndexName
)

percolateQueryResponseHits.forEach { hit ->
val id = hit.id
.replace("_${indexName}_${monitor.id}", "")
.replace("_${concreteIndexName}_${monitor.id}", "")
percolateQueryResponseHits.forEach { hit ->
val id = hit.id
.replace("_${indexName}_${monitor.id}", "")
.replace("_${concreteIndexName}_${monitor.id}", "")

val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() }
docIndices.forEach { idx ->
val docIndex = "${transformedDocs[idx].first}|$concreteIndexName"
inputRunResults.getOrPut(id) { mutableSetOf() }.add(docIndex)
docsToQueries.getOrPut(docIndex) { mutableListOf() }.add(id)
}
val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() }
docIndices.forEach { idx ->
val docIndex = "${transformedDocs[idx].first}|$concreteIndexName"
inputRunResults.getOrPut(id) { mutableSetOf() }.add(docIndex)
docsToQueries.getOrPut(docIndex) { mutableListOf() }.add(id)
}
}
} finally { // no catch block because exception is caught and handled in runMonitor() class
transformedDocs.clear()
docsSizeInBytes.set(0)
}
}

Expand Down Expand Up @@ -737,7 +797,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
} catch (e: Exception) {
throw IllegalStateException(
"Monitor ${monitor.id}: Failed to run percolate search for sourceIndex [$index] " +
"and queryIndex [$queryIndex] for ${docs.size} document(s)", e
"and queryIndex [$queryIndex] for ${docs.size} document(s)",
e
)
}

Expand All @@ -757,6 +818,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
concreteIndex: String,
monitorId: String,
conflictingFields: List<String>,
docsSizeInBytes: AtomicLong,
): List<Pair<String, BytesReference>> {
return hits.mapNotNull(fun(hit: SearchHit): Pair<String, BytesReference>? {
try {
Expand All @@ -770,9 +832,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
)
var xContentBuilder = XContentFactory.jsonBuilder().map(sourceMap)
val sourceRef = BytesReference.bytes(xContentBuilder)
logger.debug(
"Monitor $monitorId: Document [${hit.id}] payload after transform for percolate query: ${sourceRef.utf8ToString()}",
)
docsSizeInBytes.getAndAdd(sourceRef.ramBytesUsed())
return Pair(hit.id, sourceRef)
} catch (e: Exception) {
logger.error("Monitor $monitorId: Failed to transform payload $hit for percolate query", e)
Expand Down Expand Up @@ -832,4 +892,20 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}
jsonAsMap.putAll(tempMap)
}

/**
* Returns true, if the docs fetched from shards thus far amount to less than threshold
* amount of percentage (default:10. setting is dynamic and configurable) of the total heap size or not.
*
*/
private fun isInMemoryDocsSizeExceedingMemoryLimit(docsBytesSize: Long, monitorCtx: MonitorRunnerExecutionContext): Boolean {
var thresholdPercentage = PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT.get(monitorCtx.settings)
if (thresholdPercentage > 100 || thresholdPercentage < 0) {
thresholdPercentage = PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT.getDefault(monitorCtx.settings)
}
val heapMaxBytes = JvmStats.jvmStats().mem.heapMax.bytes
val thresholdBytes = (thresholdPercentage.toDouble() / 100.0) * heapMaxBytes

return docsBytesSize > thresholdBytes
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.monitor.jvm.JvmStats
import org.opensearch.script.ScriptService
import org.opensearch.threadpool.ThreadPool

Expand All @@ -36,6 +37,7 @@ data class MonitorRunnerExecutionContext(
var alertService: AlertService? = null,
var docLevelMonitorQueries: DocLevelMonitorQueries? = null,
var workflowService: WorkflowService? = null,
var jvmStats: JvmStats? = null,

@Volatile var retryPolicy: BackoffPolicy? = null,
@Volatile var moveAlertsRetryPolicy: BackoffPolicy? = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import org.opensearch.commons.alerting.model.action.Action
import org.opensearch.commons.alerting.util.isBucketLevelMonitor
import org.opensearch.core.action.ActionListener
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.monitor.jvm.JvmStats
import org.opensearch.script.Script
import org.opensearch.script.ScriptService
import org.opensearch.script.TemplateScript
Expand Down Expand Up @@ -132,6 +133,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
return this
}

fun registerJvmStats(jvmStats: JvmStats): MonitorRunnerService {
this.monitorCtx.jvmStats = jvmStats
return this
}

// Must be called after registerClusterService and registerSettings in AlertingPlugin
fun registerConsumers(): MonitorRunnerService {
monitorCtx.retryPolicy = BackoffPolicy.constantBackoff(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ class AlertingSettings {
Setting.Property.NodeScope, Setting.Property.Dynamic
)

/** Defines the threshold of the docs accumulated in memory to query against percolate query index in document
* level monitor execution. The docs are being collected from searching on shards of indices mentioned in the
* monitor input indices field.
*/
val PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT = Setting.intSetting(
"plugins.alerting.monitor.percolate_query_docs_size_memory_percentage_limit",
10,
Setting.Property.NodeScope, Setting.Property.Dynamic
)

val INPUT_TIMEOUT = Setting.positiveTimeSetting(
"plugins.alerting.input_timeout",
LegacyOpenDistroAlertingSettings.INPUT_TIMEOUT,
Expand Down

0 comments on commit 447d76f

Please sign in to comment.