Skip to content

Commit

Permalink
fix alert constructor with noop trigger to use execution id and workf…
Browse files Browse the repository at this point in the history
…low id (#981)

Signed-off-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
eirsep authored Jul 8, 2023
1 parent 62d2524 commit 6da86fe
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 15 deletions.
3 changes: 0 additions & 3 deletions alerting/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ configurations.all {
force "com.fasterxml.jackson.core:jackson-core:${versions.jackson}"
force "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:${versions.jackson}"
force "commons-logging:commons-logging:${versions.commonslogging}"
force "org.apache.httpcomponents.core5:httpcore5:5.1.4"
// force the version until OpenSearch upgrade to an invulnerable one, https://www.whitesourcesoftware.com/vulnerability-database/WS-2019-0379
force "commons-codec:commons-codec:1.13"

Expand Down Expand Up @@ -113,8 +112,6 @@ dependencies {
implementation "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}"
implementation "org.jetbrains:annotations:13.0"

implementation "org.apache.httpcomponents.core5:httpcore5:5.1.4"

api project(":alerting-core")
implementation "com.github.seancfoley:ipaddress:5.3.3"

Expand Down
19 changes: 15 additions & 4 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.MAX_SEARCH_SIZE
import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.client.Client
import org.opensearch.common.bytes.BytesReference
import org.opensearch.common.unit.TimeValue
Expand Down Expand Up @@ -207,13 +208,17 @@ class AlertService(
fun composeMonitorErrorAlert(
id: String,
monitor: Monitor,
alertError: AlertError
alertError: AlertError,
executionId: String?,
workflowRunContext: WorkflowRunContext?
): Alert {
val currentTime = Instant.now()
return Alert(
id = id, monitor = monitor, trigger = NoOpTrigger(), startTime = currentTime,
lastNotificationTime = currentTime, state = Alert.State.ERROR, errorMessage = alertError?.message,
schemaVersion = IndexUtils.alertIndexSchemaVersion
schemaVersion = IndexUtils.alertIndexSchemaVersion,
workflowId = workflowRunContext?.workflowId ?: "",
executionId = executionId ?: ""
)
}

Expand Down Expand Up @@ -311,7 +316,12 @@ class AlertService(
} ?: listOf()
}

suspend fun upsertMonitorErrorAlert(monitor: Monitor, errorMessage: String) {
suspend fun upsertMonitorErrorAlert(
monitor: Monitor,
errorMessage: String,
executionId: String?,
workflowRunContext: WorkflowRunContext?,
) {
val newErrorAlertId = "$ERROR_ALERT_ID_PREFIX-${monitor.id}-${UUID.randomUUID()}"

val searchRequest = SearchRequest(monitor.dataSources.alertsIndex)
Expand All @@ -326,7 +336,8 @@ class AlertService(
)
val searchResponse: SearchResponse = client.suspendUntil { search(searchRequest, it) }

var alert = composeMonitorErrorAlert(newErrorAlertId, monitor, AlertError(Instant.now(), errorMessage))
var alert =
composeMonitorErrorAlert(newErrorAlertId, monitor, AlertError(Instant.now(), errorMessage), executionId, workflowRunContext)

if (searchResponse.hits.totalHits.value > 0L) {
if (searchResponse.hits.totalHits.value > 1L) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,12 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
// If any error happened during trigger execution, upsert monitor error alert
val errorMessage = constructErrorMessageFromTriggerResults(triggerResults = triggerResults)
if (errorMessage.isNotEmpty()) {
monitorCtx.alertService!!.upsertMonitorErrorAlert(monitor = monitor, errorMessage = errorMessage)
monitorCtx.alertService!!.upsertMonitorErrorAlert(
monitor = monitor,
errorMessage = errorMessage,
executionId = workflowRunContext?.executionId,
workflowRunContext
)
} else {
onSuccessfulMonitorRun(monitorCtx, monitor)
}
Expand All @@ -263,7 +268,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
return monitorResult.copy(triggerResults = triggerResults)
} catch (e: Exception) {
val errorMessage = ExceptionsHelper.detailedMessage(e)
monitorCtx.alertService!!.upsertMonitorErrorAlert(monitor, errorMessage)
monitorCtx.alertService!!.upsertMonitorErrorAlert(monitor, errorMessage, workflowRunContext?.executionId, workflowRunContext)
logger.error("Failed running Document-level-monitor ${monitor.name}", e)
val alertingException = AlertingException(
errorMessage,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@
"monitor_id": {
"type": "keyword"
},
"workflow_id": {
"type": "keyword"
},
"monitor_version": {
"type": "long"
},
Expand All @@ -28,9 +25,6 @@
"severity": {
"type": "keyword"
},
"execution_id": {
"type": "keyword"
},
"monitor_name": {
"type": "text",
"fields": {
Expand Down Expand Up @@ -77,6 +71,15 @@
}
}
},
"execution_id": {
"type": "keyword"
},
"workflow_id": {
"type": "keyword"
},
"workflow_name": {
"type": "keyword"
},
"trigger_id": {
"type": "keyword"
},
Expand All @@ -97,6 +100,14 @@
}
}
},
"associated_alert_ids": {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword"
}
}
},
"related_doc_ids": {
"type" : "text",
"fields" : {
Expand Down

0 comments on commit 6da86fe

Please sign in to comment.