Skip to content

Commit

Permalink
Implemented cross-cluster monitor support (#584)
Browse files Browse the repository at this point in the history
Signed-off-by: AWSHurneyt <[email protected]>
  • Loading branch information
AWSHurneyt authored Feb 6, 2024
1 parent a632b47 commit 0c23cc5
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 16 deletions.
46 changes: 37 additions & 9 deletions src/main/kotlin/org/opensearch/commons/alerting/model/Alert.kt
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ data class Alert(
val aggregationResultBucket: AggregationResultBucket? = null,
val executionId: String? = null,
val associatedAlertIds: List<String>,
val clusters: List<String>? = null,
) : Writeable, ToXContent {

init {
Expand All @@ -61,6 +62,7 @@ data class Alert(
chainedAlertTrigger: ChainedAlertTrigger,
workflow: Workflow,
associatedAlertIds: List<String>,
clusters: List<String>? = null
) : this(
monitorId = NO_ID,
monitorName = "",
Expand All @@ -82,7 +84,8 @@ data class Alert(
executionId = executionId,
workflowId = workflow.id,
workflowName = workflow.name,
associatedAlertIds = associatedAlertIds
associatedAlertIds = associatedAlertIds,
clusters = clusters
)

constructor(
Expand All @@ -97,6 +100,7 @@ data class Alert(
schemaVersion: Int = NO_SCHEMA_VERSION,
executionId: String? = null,
workflowId: String? = null,
clusters: List<String>? = null
) : this(
monitorId = monitor.id,
monitorName = monitor.name,
Expand All @@ -118,7 +122,8 @@ data class Alert(
executionId = executionId,
workflowId = workflowId ?: "",
workflowName = "",
associatedAlertIds = emptyList()
associatedAlertIds = emptyList(),
clusters = clusters
)

constructor(
Expand All @@ -134,6 +139,7 @@ data class Alert(
findingIds: List<String> = emptyList(),
executionId: String? = null,
workflowId: String? = null,
clusters: List<String>? = null
) : this(
monitorId = monitor.id,
monitorName = monitor.name,
Expand All @@ -155,7 +161,8 @@ data class Alert(
executionId = executionId,
workflowId = workflowId ?: "",
workflowName = "",
associatedAlertIds = emptyList()
associatedAlertIds = emptyList(),
clusters = clusters
)

constructor(
Expand All @@ -172,6 +179,7 @@ data class Alert(
findingIds: List<String> = emptyList(),
executionId: String? = null,
workflowId: String? = null,
clusters: List<String>? = null
) : this(
monitorId = monitor.id,
monitorName = monitor.name,
Expand All @@ -193,7 +201,8 @@ data class Alert(
executionId = executionId,
workflowId = workflowId ?: "",
workflowName = "",
associatedAlertIds = emptyList()
associatedAlertIds = emptyList(),
clusters = clusters
)

constructor(
Expand All @@ -211,6 +220,7 @@ data class Alert(
schemaVersion: Int = NO_SCHEMA_VERSION,
executionId: String? = null,
workflowId: String? = null,
clusters: List<String>? = null
) : this(
id = id,
monitorId = monitor.id,
Expand All @@ -233,7 +243,8 @@ data class Alert(
executionId = executionId,
workflowId = workflowId ?: "",
workflowName = "",
associatedAlertIds = emptyList()
associatedAlertIds = emptyList(),
clusters = clusters
)

constructor(
Expand All @@ -248,6 +259,7 @@ data class Alert(
schemaVersion: Int = NO_SCHEMA_VERSION,
workflowId: String? = null,
executionId: String?,
clusters: List<String>? = null
) : this(
id = id,
monitorId = monitor.id,
Expand All @@ -270,7 +282,8 @@ data class Alert(
relatedDocIds = listOf(),
workflowId = workflowId ?: "",
executionId = executionId,
associatedAlertIds = emptyList()
associatedAlertIds = emptyList(),
clusters = clusters
)

enum class State {
Expand Down Expand Up @@ -311,7 +324,8 @@ data class Alert(
actionExecutionResults = sin.readList(::ActionExecutionResult),
aggregationResultBucket = if (sin.readBoolean()) AggregationResultBucket(sin) else null,
executionId = sin.readOptionalString(),
associatedAlertIds = sin.readStringList()
associatedAlertIds = sin.readStringList(),
clusters = sin.readOptionalStringList()
)

fun isAcknowledged(): Boolean = (state == State.ACKNOWLEDGED)
Expand Down Expand Up @@ -349,6 +363,7 @@ data class Alert(
}
out.writeOptionalString(executionId)
out.writeStringCollection(associatedAlertIds)
out.writeOptionalStringArray(clusters?.toTypedArray())
}

companion object {
Expand Down Expand Up @@ -379,6 +394,7 @@ data class Alert(
const val ASSOCIATED_ALERT_IDS_FIELD = "associated_alert_ids"
const val BUCKET_KEYS = AggregationResultBucket.BUCKET_KEYS
const val PARENTS_BUCKET_PATH = AggregationResultBucket.PARENTS_BUCKET_PATH
const val CLUSTERS_FIELD = "clusters"
const val NO_ID = ""
const val NO_VERSION = Versions.NOT_FOUND

Expand Down Expand Up @@ -410,6 +426,7 @@ data class Alert(
val actionExecutionResults: MutableList<ActionExecutionResult> = mutableListOf()
var aggAlertBucket: AggregationResultBucket? = null
val associatedAlertIds = mutableListOf<String>()
val clusters = mutableListOf<String>()
ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
Expand Down Expand Up @@ -476,6 +493,12 @@ data class Alert(
AggregationResultBucket.parse(xcp)
}
}
CLUSTERS_FIELD -> {
ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_ARRAY) {
clusters.add(xcp.text())
}
}
}
}

Expand Down Expand Up @@ -504,7 +527,8 @@ data class Alert(
executionId = executionId,
workflowId = workflowId,
workflowName = workflowName,
associatedAlertIds = associatedAlertIds
associatedAlertIds = associatedAlertIds,
clusters = if (clusters.size > 0) clusters else null
)
}

Expand Down Expand Up @@ -554,6 +578,9 @@ data class Alert(
.optionalTimeField(END_TIME_FIELD, endTime)
.optionalTimeField(ACKNOWLEDGED_TIME_FIELD, acknowledgedTime)
aggregationResultBucket?.innerXContent(builder)

if (!clusters.isNullOrEmpty()) builder.field(CLUSTERS_FIELD, clusters.toTypedArray())

builder.endObject()
return builder
}
Expand All @@ -577,7 +604,8 @@ data class Alert(
BUCKET_KEYS to aggregationResultBucket?.bucketKeys?.joinToString(","),
PARENTS_BUCKET_PATH to aggregationResultBucket?.parentBucketPath,
FINDING_IDS to findingIds.joinToString(","),
RELATED_DOC_IDS to relatedDocIds.joinToString(",")
RELATED_DOC_IDS to relatedDocIds.joinToString(","),
CLUSTERS_FIELD to clusters?.joinToString(",")
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ val ILLEGAL_PATH_PARAMETER_CHARACTERS = arrayOf(':', '"', '+', '\\', '|', '?', '
data class ClusterMetricsInput(
var path: String,
var pathParams: String = "",
var url: String
var url: String,
var clusters: List<String> = listOf()
) : Input {
val clusterMetricType: ClusterMetricType
val constructedUri: URI
Expand Down Expand Up @@ -74,6 +75,7 @@ data class ClusterMetricsInput(
.field(PATH_FIELD, path)
.field(PATH_PARAMS_FIELD, pathParams)
.field(URL_FIELD, url)
.field(CLUSTERS_FIELD, clusters)
.endObject()
.endObject()
}
Expand All @@ -87,6 +89,7 @@ data class ClusterMetricsInput(
out.writeString(path)
out.writeString(pathParams)
out.writeString(url)
out.writeStringArray(clusters.toTypedArray())
}

companion object {
Expand All @@ -99,6 +102,7 @@ data class ClusterMetricsInput(
const val PATH_PARAMS_FIELD = "path_params"
const val URL_FIELD = "url"
const val URI_FIELD = "uri"
const val CLUSTERS_FIELD = "clusters"

val XCONTENT_REGISTRY = NamedXContentRegistry.Entry(Input::class.java, ParseField(URI_FIELD), CheckedFunction { parseInner(it) })

Expand All @@ -110,6 +114,7 @@ data class ClusterMetricsInput(
var path = ""
var pathParams = ""
var url = ""
val clusters = mutableListOf<String>()

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)

Expand All @@ -120,9 +125,17 @@ data class ClusterMetricsInput(
PATH_FIELD -> path = xcp.text()
PATH_PARAMS_FIELD -> pathParams = xcp.text()
URL_FIELD -> url = xcp.text()
CLUSTERS_FIELD -> {
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_ARRAY,
xcp.currentToken(),
xcp
)
while (xcp.nextToken() != XContentParser.Token.END_ARRAY) clusters.add(xcp.text())
}
}
}
return ClusterMetricsInput(path, pathParams, url)
return ClusterMetricsInput(path, pathParams, url, clusters)
}
}

Expand Down Expand Up @@ -164,7 +177,7 @@ data class ClusterMetricsInput(
ILLEGAL_PATH_PARAMETER_CHARACTERS.forEach { character ->
if (pathParams.contains(character))
throw IllegalArgumentException(
"The provided path parameters contain invalid characters or spaces. Please omit: " + "${ILLEGAL_PATH_PARAMETER_CHARACTERS.joinToString(" ")}"
"The provided path parameters contain invalid characters or spaces. Please omit: " + ILLEGAL_PATH_PARAMETER_CHARACTERS.joinToString(" ")
)
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/test/kotlin/org/opensearch/commons/alerting/AlertTests.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class AlertTests {
assertEquals(templateArgs[Alert.START_TIME_FIELD], alert.startTime.toEpochMilli(), "Template args start time does not")
assertEquals(templateArgs[Alert.LAST_NOTIFICATION_TIME_FIELD], null, "Template args last notification time does not match")
assertEquals(templateArgs[Alert.SEVERITY_FIELD], alert.severity, "Template args severity does not match")
assertEquals(templateArgs[Alert.CLUSTERS_FIELD], alert.clusters?.joinToString(","), "Template args clusters does not match")
}

@Test
Expand All @@ -40,6 +41,7 @@ class AlertTests {
assertEquals(templateArgs[Alert.START_TIME_FIELD], alert.startTime.toEpochMilli(), "Template args start time does not")
assertEquals(templateArgs[Alert.LAST_NOTIFICATION_TIME_FIELD], null, "Template args last notification time does not match")
assertEquals(templateArgs[Alert.SEVERITY_FIELD], alert.severity, "Template args severity does not match")
assertEquals(templateArgs[Alert.CLUSTERS_FIELD], alert.clusters?.joinToString(","), "Template args clusters does not match")
assertEquals(
templateArgs[Alert.BUCKET_KEYS],
alert.aggregationResultBucket?.bucketKeys?.joinToString(","),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -527,9 +527,11 @@ fun assertUserNull(monitor: Monitor) {
fun randomAlert(monitor: Monitor = randomQueryLevelMonitor()): Alert {
val trigger = randomQueryLevelTrigger()
val actionExecutionResults = mutableListOf(randomActionExecutionResult(), randomActionExecutionResult())
val clusterCount = (-1..5).random()
val clusters = if (clusterCount == -1) null else (0..clusterCount).map { "index-$it" }
return Alert(
monitor, trigger, Instant.now().truncatedTo(ChronoUnit.MILLIS), null,
actionExecutionResults = actionExecutionResults
actionExecutionResults = actionExecutionResults, clusters = clusters
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,8 @@ class XContentTests {
errorMessage = "some error",
lastNotificationTime = Instant.now(),
workflowId = "",
executionId = ""
executionId = "",
clusters = listOf()
)
assertEquals("Round tripping alert doesn't work", alert.triggerName, "NoOp trigger")
}
Expand All @@ -462,7 +463,8 @@ class XContentTests {
"\"state\":\"ACTIVE\",\"error_message\":null,\"alert_history\":[],\"severity\":\"1\",\"action_execution_results\"" +
":[{\"action_id\":\"ghe1-XQBySl0wQKDBkOG\",\"last_execution_time\":1601917224583,\"throttled_count\":-1478015168}," +
"{\"action_id\":\"gxe1-XQBySl0wQKDBkOH\",\"last_execution_time\":1601917224583,\"throttled_count\":-768533744}]," +
"\"start_time\":1601917224599,\"last_notification_time\":null,\"end_time\":null,\"acknowledged_time\":null}"
"\"start_time\":1601917224599,\"last_notification_time\":null,\"end_time\":null,\"acknowledged_time\":null," +
"\"clusters\":[\"cluster-1\",\"cluster-2\"]}"
val parsedAlert = Alert.parse(parser(alertStr))
OpenSearchTestCase.assertNull(parsedAlert.monitorUser)
}
Expand All @@ -475,7 +477,8 @@ class XContentTests {
"\"state\":\"ACTIVE\",\"error_message\":null,\"alert_history\":[],\"severity\":\"1\",\"action_execution_results\"" +
":[{\"action_id\":\"ghe1-XQBySl0wQKDBkOG\",\"last_execution_time\":1601917224583,\"throttled_count\":-1478015168}," +
"{\"action_id\":\"gxe1-XQBySl0wQKDBkOH\",\"last_execution_time\":1601917224583,\"throttled_count\":-768533744}]," +
"\"start_time\":1601917224599,\"last_notification_time\":null,\"end_time\":null,\"acknowledged_time\":null}"
"\"start_time\":1601917224599,\"last_notification_time\":null,\"end_time\":null,\"acknowledged_time\":null," +
"\"clusters\":[\"cluster-1\",\"cluster-2\"]}"
val parsedAlert = Alert.parse(parser(alertStr))
OpenSearchTestCase.assertNull(parsedAlert.monitorUser)
}
Expand Down

0 comments on commit 0c23cc5

Please sign in to comment.