Skip to content

Commit

Permalink
Add Trigger condition resolver which parses and evaluates the Trigger…
Browse files Browse the repository at this point in the history
… expression.

Signed-off-by: Saurabh Singh <[email protected]>
  • Loading branch information
getsaurabh02 committed Apr 15, 2022
1 parent 5f4d9fd commit c18a9a3
Show file tree
Hide file tree
Showing 14 changed files with 597 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ object DocumentReturningMonitorRunner : MonitorRunner {
monitor,
idQueryMap,
docsToQueries,
queryIds,
queryToDocIds,
dryrun
)
}
Expand All @@ -146,11 +146,11 @@ object DocumentReturningMonitorRunner : MonitorRunner {
monitor: Monitor,
idQueryMap: Map<String, DocLevelQuery>,
docsToQueries: Map<String, List<String>>,
queryIds: List<String>,
queryToDocIds: Map<DocLevelQuery, Set<String>>,
dryrun: Boolean
): DocumentLevelTriggerRunResult {
val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger)
val triggerResult = monitorCtx.triggerService!!.runDocLevelTrigger(monitor, trigger, triggerCtx, docsToQueries, queryIds)
val triggerResult = monitorCtx.triggerService!!.runDocLevelTrigger(monitor, trigger, queryToDocIds)

logger.info("trigger results")
logger.info(triggerResult.triggeredDocs.toString())
Expand Down
36 changes: 13 additions & 23 deletions alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.alerting
import org.apache.logging.log4j.LogManager
import org.opensearch.alerting.aggregation.bucketselectorext.BucketSelectorIndices.Fields.BUCKET_INDICES
import org.opensearch.alerting.aggregation.bucketselectorext.BucketSelectorIndices.Fields.PARENT_BUCKET_PATH
import org.opensearch.alerting.core.model.DocLevelQuery
import org.opensearch.alerting.model.AggregationResultBucket
import org.opensearch.alerting.model.Alert
import org.opensearch.alerting.model.BucketLevelTrigger
Expand All @@ -18,20 +19,22 @@ import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.model.QueryLevelTrigger
import org.opensearch.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.script.TriggerScript
import org.opensearch.alerting.triggercondition.parsers.TriggerExpressionParser
import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.script.Script
import org.opensearch.script.ScriptService
import org.opensearch.search.aggregations.Aggregation
import org.opensearch.search.aggregations.Aggregations
import org.opensearch.search.aggregations.support.AggregationPath
import java.time.Instant

/** Service that handles executing Triggers */
class TriggerService(val scriptService: ScriptService) {

private val logger = LogManager.getLogger(TriggerService::class.java)
private val ALWAYS_RUN = Script("return true")
private val NEVER_RUN = Script("return false")

fun isQueryLevelTriggerActionable(ctx: QueryLevelTriggerExecutionContext, result: QueryLevelTriggerRunResult): Boolean {
// Suppress actions if the current alert is acknowledged and there are no errors.
Expand Down Expand Up @@ -60,31 +63,18 @@ class TriggerService(val scriptService: ScriptService) {
fun runDocLevelTrigger(
monitor: Monitor,
trigger: DocumentLevelTrigger,
ctx: DocumentLevelTriggerExecutionContext,
docsToQueries: Map<String, List<String>>,
queryIds: List<String>
queryToDocIds: Map<DocLevelQuery, Set<String>>
): DocumentLevelTriggerRunResult {
return try {
val triggeredDocs = mutableListOf<String>()
var triggeredDocs = mutableListOf<String>()

val dummyTrigger = QueryLevelTrigger(
name = trigger.name,
severity = trigger.severity,
actions = trigger.actions,
condition = trigger.condition
)
val dummyExecutionContext = QueryLevelTriggerExecutionContext(monitor, dummyTrigger, emptyList(), Instant.now(), Instant.now())

for (doc in docsToQueries.keys) {
val params = trigger.condition.params.toMutableMap()
for (queryId in queryIds) {
params[queryId] = docsToQueries[doc]!!.contains(queryId)
if (trigger.condition.idOrCode.equals(ALWAYS_RUN.idOrCode)) {
for (value in queryToDocIds.values) {
triggeredDocs.addAll(value)
}
val triggered = scriptService.compile(trigger.condition, TriggerScript.CONTEXT)
.newInstance(params)
.execute(dummyExecutionContext)
logger.info("trigger val: $triggered")
if (triggered) triggeredDocs.add(doc)
} else if (!trigger.condition.idOrCode.equals(NEVER_RUN.idOrCode)) {
triggeredDocs = TriggerExpressionParser(trigger.condition.idOrCode).parse()
.evaluate(queryToDocIds).toMutableList()
}

DocumentLevelTriggerRunResult(trigger.name, triggeredDocs, null)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.triggercondition.parsers

import org.opensearch.alerting.triggercondition.resolvers.TriggerExpressionResolver

interface ExpressionParser {
fun parse(): TriggerExpressionResolver
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.triggercondition.parsers

import org.opensearch.alerting.triggercondition.resolvers.TriggerExpressionRPNResolver
import org.opensearch.alerting.triggercondition.tokens.TriggerExpressionOperator

/**
* The postfix (Reverse Polish Notation) parser.
* Uses the Shunting-yard algorithm to parse a mathematical expression
* @param triggerExpression String containing the trigger expression for the monitor
*/
class TriggerExpressionParser(
triggerExpression: String
) : TriggerExpressionRPNBaseParser(triggerExpression) {

override fun parse(): TriggerExpressionRPNResolver {
val expression = expressionToParse.replace(" ", "")

val splitters = ArrayList<String>()
TriggerExpressionOperator.values().forEach { splitters.add(it.value) }

val breaks = ArrayList<String>().apply { add(expression) }
for (s in splitters) {
val a = ArrayList<String>()
for (ind in 0 until breaks.size) {
breaks[ind].let {
if (it.length > 1) {
a.addAll(breakString(breaks[ind], s))
} else a.add(it)
}
}
breaks.clear()
breaks.addAll(a)
}

return TriggerExpressionRPNResolver(convertInfixToPostfix(breaks))
}

private fun breakString(input: String, delimeter: String): ArrayList<String> {
val tokens = input.split(delimeter)
val array = ArrayList<String>()
for (t in tokens) {
array.add(t)
array.add(delimeter)
}
array.removeAt(array.size - 1)
return array
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.triggercondition.parsers

import org.opensearch.alerting.triggercondition.tokens.ExpressionToken
import org.opensearch.alerting.triggercondition.tokens.TriggerExpressionConstant
import org.opensearch.alerting.triggercondition.tokens.TriggerExpressionOperator
import org.opensearch.alerting.triggercondition.tokens.TriggerExpressionToken
import java.util.Stack

/**
* This is the abstract base class which holds the trigger expression parsing logic;
* using the Infix to Postfix a.k.a. Reverse Polish Notation (RPN) parser.
* It also uses the Shunting-Yard algorithm to parse the given trigger expression.
*
* @param expressionToParse Complete string containing the trigger expression
*/
abstract class TriggerExpressionRPNBaseParser(
protected val expressionToParse: String
) : ExpressionParser {
/**
* To perform the Infix-to-postfix conversion of the trigger expression
*/
protected fun convertInfixToPostfix(expTokens: List<String>): ArrayList<ExpressionToken> {
val expTokenStack = Stack<ExpressionToken>()
val outputExpTokens = ArrayList<ExpressionToken>()

for (tokenString in expTokens) {
if (tokenString.isEmpty()) continue
when (val expToken = assignToken(tokenString)) {
is TriggerExpressionToken -> outputExpTokens.add(expToken)
is TriggerExpressionOperator -> {
when (expToken) {
TriggerExpressionOperator.PAR_LEFT -> expTokenStack.push(expToken)
TriggerExpressionOperator.PAR_RIGHT -> {
var topExpToken = expTokenStack.popExpTokenOrNull<TriggerExpressionOperator>()
while (topExpToken != null && topExpToken != TriggerExpressionOperator.PAR_LEFT) {
outputExpTokens.add(topExpToken)
topExpToken = expTokenStack.popExpTokenOrNull<TriggerExpressionOperator>()
}
if (topExpToken != TriggerExpressionOperator.PAR_LEFT)
throw java.lang.IllegalArgumentException("No matching left parenthesis.")
}
else -> {
var op2 = expTokenStack.peekExpTokenOrNull<TriggerExpressionOperator>()
while (op2 != null) {
val c = expToken.precedence.compareTo(op2.precedence)
if (c < 0 || !expToken.rightAssociative && c <= 0) {
outputExpTokens.add(expTokenStack.pop())
} else {
break
}
op2 = expTokenStack.peekExpTokenOrNull<TriggerExpressionOperator>()
}
expTokenStack.push(expToken)
}
}
}
}
}

while (!expTokenStack.isEmpty()) {
expTokenStack.peekExpTokenOrNull<TriggerExpressionOperator>()?.let {
if (it == TriggerExpressionOperator.PAR_LEFT)
throw java.lang.IllegalArgumentException("No matching right parenthesis.")
}
val top = expTokenStack.pop()
outputExpTokens.add(top)
}

return outputExpTokens
}

/**
* Looks up and maps the expression token that matches the string version of that expression unit
*/
private fun assignToken(tokenString: String): ExpressionToken {

// Check "query" string in trigger expression such as in 'query[name="abc"]'
if (tokenString.startsWith(TriggerExpressionConstant.ConstantType.QUERY.ident))
return TriggerExpressionToken(tokenString)

// Check operators in trigger expression such as in [&&, ||, !]
for (op in TriggerExpressionOperator.values()) {
if (op.value == tokenString) return op
}

// Check any constants in trigger expression such as in ["name, "id", "tag", [", "]", "="]
for (con in TriggerExpressionConstant.ConstantType.values()) {
if (tokenString == con.ident) return TriggerExpressionConstant(con)
}

throw IllegalArgumentException("Error while processing the trigger expression '$tokenString'")
}

private inline fun <reified T> Stack<ExpressionToken>.popExpTokenOrNull(): T? {
return try {
pop() as T
} catch (e: java.lang.Exception) {
null
}
}

private inline fun <reified T> Stack<ExpressionToken>.peekExpTokenOrNull(): T? {
return try {
peek() as T
} catch (e: java.lang.Exception) {
null
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.triggercondition.resolvers

sealed class TriggerExpression {

fun resolve(): Set<String> = when (this) {
is And -> resolveAnd(docSet1, docSet2)
is Or -> resolveOr(docSet1, docSet2)
is Not -> resolveNot(allDocs, docSet2)
}

private fun resolveAnd(documentSet1: Set<String>, documentSet2: Set<String>): Set<String> {
return documentSet1.intersect(documentSet2)
}

private fun resolveOr(documentSet1: Set<String>, documentSet2: Set<String>): Set<String> {
return documentSet1.union(documentSet2)
}

private fun resolveNot(allDocs: Set<String>, documentSet2: Set<String>): Set<String> {
return allDocs.subtract(documentSet2)
}

// Operators implemented as operator functions
class And(val docSet1: Set<String>, val docSet2: Set<String>) : TriggerExpression()
class Or(val docSet1: Set<String>, val docSet2: Set<String>) : TriggerExpression()
class Not(val allDocs: Set<String>, val docSet2: Set<String>) : TriggerExpression()
}
Loading

0 comments on commit c18a9a3

Please sign in to comment.