Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Trigger condition parser and resolver for Doc Level Alerts #405

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ object DocumentReturningMonitorRunner : MonitorRunner {
monitor,
idQueryMap,
docsToQueries,
queryIds,
queryToDocIds,
dryrun
)
}
Expand All @@ -146,11 +146,11 @@ object DocumentReturningMonitorRunner : MonitorRunner {
monitor: Monitor,
idQueryMap: Map<String, DocLevelQuery>,
docsToQueries: Map<String, List<String>>,
queryIds: List<String>,
queryToDocIds: Map<DocLevelQuery, Set<String>>,
dryrun: Boolean
): DocumentLevelTriggerRunResult {
val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger)
val triggerResult = monitorCtx.triggerService!!.runDocLevelTrigger(monitor, trigger, triggerCtx, docsToQueries, queryIds)
val triggerResult = monitorCtx.triggerService!!.runDocLevelTrigger(monitor, trigger, queryToDocIds)

logger.info("trigger results")
logger.info(triggerResult.triggeredDocs.toString())
Expand Down
36 changes: 13 additions & 23 deletions alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.alerting
import org.apache.logging.log4j.LogManager
import org.opensearch.alerting.aggregation.bucketselectorext.BucketSelectorIndices.Fields.BUCKET_INDICES
import org.opensearch.alerting.aggregation.bucketselectorext.BucketSelectorIndices.Fields.PARENT_BUCKET_PATH
import org.opensearch.alerting.core.model.DocLevelQuery
import org.opensearch.alerting.model.AggregationResultBucket
import org.opensearch.alerting.model.Alert
import org.opensearch.alerting.model.BucketLevelTrigger
Expand All @@ -18,20 +19,22 @@ import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.model.QueryLevelTrigger
import org.opensearch.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.script.TriggerScript
import org.opensearch.alerting.triggercondition.parsers.TriggerExpressionParser
import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.script.Script
import org.opensearch.script.ScriptService
import org.opensearch.search.aggregations.Aggregation
import org.opensearch.search.aggregations.Aggregations
import org.opensearch.search.aggregations.support.AggregationPath
import java.time.Instant

/** Service that handles executing Triggers */
class TriggerService(val scriptService: ScriptService) {

private val logger = LogManager.getLogger(TriggerService::class.java)
private val ALWAYS_RUN = Script("return true")
private val NEVER_RUN = Script("return false")

fun isQueryLevelTriggerActionable(ctx: QueryLevelTriggerExecutionContext, result: QueryLevelTriggerRunResult): Boolean {
// Suppress actions if the current alert is acknowledged and there are no errors.
Expand Down Expand Up @@ -60,31 +63,18 @@ class TriggerService(val scriptService: ScriptService) {
fun runDocLevelTrigger(
monitor: Monitor,
trigger: DocumentLevelTrigger,
ctx: DocumentLevelTriggerExecutionContext,
docsToQueries: Map<String, List<String>>,
queryIds: List<String>
queryToDocIds: Map<DocLevelQuery, Set<String>>
): DocumentLevelTriggerRunResult {
return try {
val triggeredDocs = mutableListOf<String>()
var triggeredDocs = mutableListOf<String>()

val dummyTrigger = QueryLevelTrigger(
name = trigger.name,
severity = trigger.severity,
actions = trigger.actions,
condition = trigger.condition
)
val dummyExecutionContext = QueryLevelTriggerExecutionContext(monitor, dummyTrigger, emptyList(), Instant.now(), Instant.now())

for (doc in docsToQueries.keys) {
val params = trigger.condition.params.toMutableMap()
for (queryId in queryIds) {
params[queryId] = docsToQueries[doc]!!.contains(queryId)
if (trigger.condition.idOrCode.equals(ALWAYS_RUN.idOrCode)) {
for (value in queryToDocIds.values) {
triggeredDocs.addAll(value)
}
val triggered = scriptService.compile(trigger.condition, TriggerScript.CONTEXT)
.newInstance(params)
.execute(dummyExecutionContext)
logger.info("trigger val: $triggered")
if (triggered) triggeredDocs.add(doc)
} else if (!trigger.condition.idOrCode.equals(NEVER_RUN.idOrCode)) {
triggeredDocs = TriggerExpressionParser(trigger.condition.idOrCode).parse()
.evaluate(queryToDocIds).toMutableList()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the if-else if block moves the object equals check to runtime. Kotlin allows this check to be made at compile time using when control statements.

when (trigger.condition.idOrCode) {
                ALWAYS_RUN.idOrCode  -> {
                    for (value in queryToDocIds.values) {
                        triggeredDocs.addAll(value)
                    }
                }
                NEVER_RUN.idOrCode -> {
                    triggeredDocs = TriggerExpressionParser(trigger.condition.idOrCode).parse()
                        .evaluate(queryToDocIds).toMutableList()
                }
            }

this will allow catching runtime errors we faced today at compile time only.

Copy link
Member Author

@getsaurabh02 getsaurabh02 Apr 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion @sbcd90. Agree, control statement could make the flow look much cleaner and I have used it in the Parser/Evaluation logic. However, here since this is not an instance check, but rather the value check with string comparison, I am not sure having control statement would add any value here in terms of performance. Given string value comparison still need to be performed at runtime.

Since we weren't using all the three possible values here, I chose to keep it as if-else block instead.

}

DocumentLevelTriggerRunResult(trigger.name, triggeredDocs, null)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.triggercondition.parsers

import org.opensearch.alerting.triggercondition.resolvers.TriggerExpressionResolver

interface ExpressionParser {
fun parse(): TriggerExpressionResolver
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.triggercondition.parsers

import org.opensearch.alerting.triggercondition.resolvers.TriggerExpressionRPNResolver
import org.opensearch.alerting.triggercondition.tokens.TriggerExpressionOperator

/**
* The postfix (Reverse Polish Notation) parser.
* Uses the Shunting-yard algorithm to parse a mathematical expression
* @param triggerExpression String containing the trigger expression for the monitor
*/
class TriggerExpressionParser(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the class name be changed to TriggerRPNExpressionParser?
parse() returns TriggerExpressionRPNResolver
tomorrow if someone wants to have another implementation of interface ExpressionParser

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since currently there is only one concrete implementation of the Trigger Expression Parser have kept the name as is for clarity. However, the base implementation which holds the RPN logic is named as the TriggerExpressionRPNBaseParser.
Later addition of new base implementations are possible using the new strategy, while the concrete implementation could still expose them via different method interfaces.

triggerExpression: String
) : TriggerExpressionRPNBaseParser(triggerExpression) {

override fun parse(): TriggerExpressionRPNResolver {
val expression = expressionToParse.replace(" ", "")

val splitters = ArrayList<String>()
TriggerExpressionOperator.values().forEach { splitters.add(it.value) }

val breaks = ArrayList<String>().apply { add(expression) }
for (s in splitters) {
val a = ArrayList<String>()
for (ind in 0 until breaks.size) {
breaks[ind].let {
if (it.length > 1) {
a.addAll(breakString(breaks[ind], s))
} else a.add(it)
}
}
breaks.clear()
breaks.addAll(a)
}

return TriggerExpressionRPNResolver(convertInfixToPostfix(breaks))
}

private fun breakString(input: String, delimeter: String): ArrayList<String> {
val tokens = input.split(delimeter)
val array = ArrayList<String>()
for (t in tokens) {
array.add(t)
array.add(delimeter)
}
array.removeAt(array.size - 1)
return array
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.triggercondition.parsers

import org.opensearch.alerting.triggercondition.tokens.ExpressionToken
import org.opensearch.alerting.triggercondition.tokens.TriggerExpressionConstant
import org.opensearch.alerting.triggercondition.tokens.TriggerExpressionOperator
import org.opensearch.alerting.triggercondition.tokens.TriggerExpressionToken
import java.util.Stack

/**
* This is the abstract base class which holds the trigger expression parsing logic;
* using the Infix to Postfix a.k.a. Reverse Polish Notation (RPN) parser.
* It also uses the Shunting-Yard algorithm to parse the given trigger expression.
*
* @param expressionToParse Complete string containing the trigger expression
*/
abstract class TriggerExpressionRPNBaseParser(
protected val expressionToParse: String
) : ExpressionParser {
/**
* To perform the Infix-to-postfix conversion of the trigger expression
*/
protected fun convertInfixToPostfix(expTokens: List<String>): ArrayList<ExpressionToken> {
val expTokenStack = Stack<ExpressionToken>()
val outputExpTokens = ArrayList<ExpressionToken>()

for (tokenString in expTokens) {
if (tokenString.isEmpty()) continue
when (val expToken = assignToken(tokenString)) {
is TriggerExpressionToken -> outputExpTokens.add(expToken)
is TriggerExpressionOperator -> {
when (expToken) {
TriggerExpressionOperator.PAR_LEFT -> expTokenStack.push(expToken)
TriggerExpressionOperator.PAR_RIGHT -> {
var topExpToken = expTokenStack.popExpTokenOrNull<TriggerExpressionOperator>()
while (topExpToken != null && topExpToken != TriggerExpressionOperator.PAR_LEFT) {
outputExpTokens.add(topExpToken)
topExpToken = expTokenStack.popExpTokenOrNull<TriggerExpressionOperator>()
}
if (topExpToken != TriggerExpressionOperator.PAR_LEFT)
throw java.lang.IllegalArgumentException("No matching left parenthesis.")
}
else -> {
var op2 = expTokenStack.peekExpTokenOrNull<TriggerExpressionOperator>()
while (op2 != null) {
val c = expToken.precedence.compareTo(op2.precedence)
if (c < 0 || !expToken.rightAssociative && c <= 0) {
outputExpTokens.add(expTokenStack.pop())
} else {
break
}
op2 = expTokenStack.peekExpTokenOrNull<TriggerExpressionOperator>()
}
expTokenStack.push(expToken)
}
}
}
}
}

while (!expTokenStack.isEmpty()) {
expTokenStack.peekExpTokenOrNull<TriggerExpressionOperator>()?.let {
if (it == TriggerExpressionOperator.PAR_LEFT)
throw java.lang.IllegalArgumentException("No matching right parenthesis.")
}
val top = expTokenStack.pop()
outputExpTokens.add(top)
}

return outputExpTokens
}

/**
* Looks up and maps the expression token that matches the string version of that expression unit
*/
private fun assignToken(tokenString: String): ExpressionToken {

// Check "query" string in trigger expression such as in 'query[name="abc"]'
if (tokenString.startsWith(TriggerExpressionConstant.ConstantType.QUERY.ident))
return TriggerExpressionToken(tokenString)

// Check operators in trigger expression such as in [&&, ||, !]
for (op in TriggerExpressionOperator.values()) {
if (op.value == tokenString) return op
}

// Check any constants in trigger expression such as in ["name, "id", "tag", [", "]", "="]
for (con in TriggerExpressionConstant.ConstantType.values()) {
if (tokenString == con.ident) return TriggerExpressionConstant(con)
}

throw IllegalArgumentException("Error while processing the trigger expression '$tokenString'")
}

private inline fun <reified T> Stack<ExpressionToken>.popExpTokenOrNull(): T? {
return try {
pop() as T
} catch (e: java.lang.Exception) {
null
}
}

private inline fun <reified T> Stack<ExpressionToken>.peekExpTokenOrNull(): T? {
return try {
peek() as T
} catch (e: java.lang.Exception) {
null
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.triggercondition.resolvers

sealed class TriggerExpression {

fun resolve(): Set<String> = when (this) {
is And -> resolveAnd(docSet1, docSet2)
is Or -> resolveOr(docSet1, docSet2)
is Not -> resolveNot(allDocs, docSet2)
}

private fun resolveAnd(documentSet1: Set<String>, documentSet2: Set<String>): Set<String> {
return documentSet1.intersect(documentSet2)
}

private fun resolveOr(documentSet1: Set<String>, documentSet2: Set<String>): Set<String> {
return documentSet1.union(documentSet2)
}

private fun resolveNot(allDocs: Set<String>, documentSet2: Set<String>): Set<String> {
return allDocs.subtract(documentSet2)
}

// Operators implemented as operator functions
class And(val docSet1: Set<String>, val docSet2: Set<String>) : TriggerExpression()
class Or(val docSet1: Set<String>, val docSet2: Set<String>) : TriggerExpression()
class Not(val allDocs: Set<String>, val docSet2: Set<String>) : TriggerExpression()
}
Loading