Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CorrelationAlert model added #631

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 208 additions & 0 deletions src/main/kotlin/org/opensearch/commons/alerting/model/BaseAlert.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
package org.opensearch.commons.alerting.model

import org.opensearch.common.lucene.uid.Versions
import org.opensearch.commons.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION
import org.opensearch.commons.alerting.util.instant
import org.opensearch.commons.alerting.util.optionalTimeField
import org.opensearch.commons.alerting.util.optionalUserField
import org.opensearch.commons.authuser.User
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.common.io.stream.Writeable
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParserUtils
import java.io.IOException
import java.time.Instant

/** CorrelationAlert and Alert can extend the UnifiedAlert class to inherit the common fields and behavior
* of UnifiedAlert class.
*/
open class BaseAlert(
open val id: String = Alert.NO_ID,
open val version: Long = Alert.NO_VERSION,
open val schemaVersion: Int = NO_SCHEMA_VERSION,
open val user: User?,
open val triggerName: String,

// State will be later moved to this Class (after `monitorBasedAlerts` extend this Class)
open val state: Alert.State,
open val startTime: Instant,
open val endTime: Instant? = null,
open val acknowledgedTime: Instant? = null,
open val errorMessage: String? = null,
open val severity: String,
open val actionExecutionResults: List<ActionExecutionResult>
) : Writeable, ToXContent {

init {
if (errorMessage != null) {
require((state == Alert.State.DELETED) || (state == Alert.State.ERROR) || (state == Alert.State.AUDIT)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would alert state Deleted mandate a non-null error message

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah this one's in Alert state class also, so eventually Monitor based Alert (i.e Alert class) will also extend this BaseClass, therefore.

"Attempt to create an alert with an error in state: $state"
}
}
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, but here src/test/kotlin/org/opensearch/commons/alerting/CorrelationAlertTests.kt in test correlation parse function we are validating such scenarios

id = sin.readString(),
version = sin.readLong(),
schemaVersion = sin.readInt(),
user = if (sin.readBoolean()) {
User(sin)
} else {
null
},
triggerName = sin.readString(),
state = sin.readEnum(Alert.State::class.java),
startTime = sin.readInstant(),
endTime = sin.readOptionalInstant(),
acknowledgedTime = sin.readOptionalInstant(),
errorMessage = sin.readOptionalString(),
severity = sin.readString(),
actionExecutionResults = sin.readList(::ActionExecutionResult)
)

fun isAcknowledged(): Boolean = (state == Alert.State.ACKNOWLEDGED)

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeString(id)
out.writeLong(version)
out.writeInt(schemaVersion)
out.writeBoolean(user != null)
user?.writeTo(out)
out.writeString(triggerName)
out.writeEnum(state)
out.writeInstant(startTime)
out.writeOptionalInstant(endTime)
out.writeOptionalInstant(acknowledgedTime)
out.writeOptionalString(errorMessage)
out.writeString(severity)
out.writeCollection(actionExecutionResults)
}

companion object {
const val ALERT_ID_FIELD = "id"
const val SCHEMA_VERSION_FIELD = "schemaVersion"
const val ALERT_VERSION_FIELD = "version"
const val USER_FIELD = "user"
const val TRIGGER_NAME_FIELD = "triggerName"
const val STATE_FIELD = "state"
const val START_TIME_FIELD = "startTime"
const val END_TIME_FIELD = "endTime"
const val ACKNOWLEDGED_TIME_FIELD = "acknowledgedTime"
const val ERROR_MESSAGE_FIELD = "errorMessage"
const val SEVERITY_FIELD = "severity"
const val ACTION_EXECUTION_RESULTS_FIELD = "actionExecutionResults"
const val NO_ID = ""
const val NO_VERSION = Versions.NOT_FOUND

@JvmStatic
@JvmOverloads
@Throws(IOException::class)
fun parse(xcp: XContentParser, version: Long = NO_VERSION): BaseAlert {
lateinit var id: String
var schemaVersion = NO_SCHEMA_VERSION
var version: Long = Versions.NOT_FOUND
var user: User? = null
lateinit var triggerName: String
lateinit var state: Alert.State
lateinit var startTime: Instant
lateinit var severity: String
var endTime: Instant? = null
var acknowledgedTime: Instant? = null
var errorMessage: String? = null
val actionExecutionResults: MutableList<ActionExecutionResult> = mutableListOf()
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()
when (fieldName) {
USER_FIELD -> user = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else User.parse(xcp)
ALERT_ID_FIELD -> id = xcp.text()
ALERT_VERSION_FIELD -> version = xcp.longValue()
SCHEMA_VERSION_FIELD -> schemaVersion = xcp.intValue()
TRIGGER_NAME_FIELD -> triggerName = xcp.text()
STATE_FIELD -> state = Alert.State.valueOf(xcp.text())
ERROR_MESSAGE_FIELD -> errorMessage = xcp.textOrNull()
SEVERITY_FIELD -> severity = xcp.text()
ACTION_EXECUTION_RESULTS_FIELD -> {
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_ARRAY,
xcp.currentToken(),
xcp
)
while (xcp.nextToken() != XContentParser.Token.END_ARRAY) {
actionExecutionResults.add(ActionExecutionResult.parse(xcp))
}
}
START_TIME_FIELD -> startTime = requireNotNull(xcp.instant())
END_TIME_FIELD -> endTime = xcp.instant()
ACKNOWLEDGED_TIME_FIELD -> acknowledgedTime = xcp.instant()
}
}

return BaseAlert(
id = id,
startTime = requireNotNull(startTime),
endTime = endTime,
state = requireNotNull(state),
version = version,
errorMessage = errorMessage,
actionExecutionResults = actionExecutionResults,
schemaVersion = schemaVersion,
user = user,
triggerName = requireNotNull(triggerName),
severity = severity,
acknowledgedTime = acknowledgedTime
)
}

@JvmStatic
@Throws(IOException::class)
fun readFrom(sin: StreamInput): Alert {
return Alert(sin)
}
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
return createXContentBuilder(builder, true)
}

fun toXContentWithUser(builder: XContentBuilder): XContentBuilder {
return createXContentBuilder(builder, false)
}

fun createXContentBuilder(builder: XContentBuilder, secure: Boolean): XContentBuilder {
if (!secure) {
builder.optionalUserField(USER_FIELD, user)
}
builder.field(ALERT_ID_FIELD, id)
.field(ALERT_VERSION_FIELD, version)
.field(SCHEMA_VERSION_FIELD, schemaVersion)
.field(TRIGGER_NAME_FIELD, triggerName)
.field(STATE_FIELD, state)
.field(ERROR_MESSAGE_FIELD, errorMessage)
.field(SEVERITY_FIELD, severity)
.field(ACTION_EXECUTION_RESULTS_FIELD, actionExecutionResults.toTypedArray())
.optionalTimeField(START_TIME_FIELD, startTime)
.optionalTimeField(END_TIME_FIELD, endTime)
.optionalTimeField(ACKNOWLEDGED_TIME_FIELD, acknowledgedTime)
return builder
}

open fun asTemplateArg(): Map<String, Any?> {
return mapOf(
ACKNOWLEDGED_TIME_FIELD to acknowledgedTime?.toEpochMilli(),
ALERT_ID_FIELD to id,
ALERT_VERSION_FIELD to version,
END_TIME_FIELD to endTime?.toEpochMilli(),
ERROR_MESSAGE_FIELD to errorMessage,
SEVERITY_FIELD to severity,
START_TIME_FIELD to startTime.toEpochMilli(),
STATE_FIELD to state.toString(),
TRIGGER_NAME_FIELD to triggerName
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package org.opensearch.commons.alerting.model

import org.opensearch.commons.authuser.User
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParserUtils
import java.io.IOException
import java.time.Instant

class CorrelationAlert : BaseAlert {

// CorrelationAlert-specific properties
val correlatedFindingIds: List<String>
val correlationRuleId: String
val correlationRuleName: String

constructor(
correlatedFindingIds: List<String>,
correlationRuleId: String,
correlationRuleName: String,
id: String,
version: Long,
schemaVersion: Int,
user: User?,
triggerName: String,
state: Alert.State,
startTime: Instant,
endTime: Instant?,
acknowledgedTime: Instant?,
errorMessage: String?,
severity: String,
actionExecutionResults: List<ActionExecutionResult>
) : super(
id = id,
version = version,
schemaVersion = schemaVersion,
user = user,
triggerName = triggerName,
state = state,
startTime = startTime,
endTime = endTime,
acknowledgedTime = acknowledgedTime,
errorMessage = errorMessage,
severity = severity,
actionExecutionResults = actionExecutionResults
) {
this.correlatedFindingIds = correlatedFindingIds
this.correlationRuleId = correlationRuleId
this.correlationRuleName = correlationRuleName
}

@Throws(IOException::class)
constructor(sin: StreamInput) : super(sin) {
correlatedFindingIds = sin.readStringList()
correlationRuleId = sin.readString()
correlationRuleName = sin.readString()
}

// Override to include CorrelationAlert specific fields
fun toXContent(builder: XContentBuilder): XContentBuilder {
builder.startObject()
.startArray(CORRELATED_FINDING_IDS)
correlatedFindingIds.forEach { id ->
builder.value(id)
}
builder.endArray()
.field(CORRELATION_RULE_ID, correlationRuleId)
.field(CORRELATION_RULE_NAME, correlationRuleName)
super.toXContentWithUser(builder)
builder.endObject()
return builder
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
super.writeTo(out)
out.writeStringCollection(correlatedFindingIds)
out.writeString(correlationRuleId)
out.writeString(correlationRuleName)
}
override fun asTemplateArg(): Map<String, Any?> {
val superTemplateArgs = super.asTemplateArg()
val correlationSpecificArgs = mapOf(
CORRELATED_FINDING_IDS to correlatedFindingIds,
CORRELATION_RULE_ID to correlationRuleId,
CORRELATION_RULE_NAME to correlationRuleName
)
return superTemplateArgs + correlationSpecificArgs
}
companion object {
const val CORRELATED_FINDING_IDS = "correlatedFindingIds"
const val CORRELATION_RULE_ID = "correlationRuleId"
const val CORRELATION_RULE_NAME = "correlationRuleName"

@JvmStatic
@Throws(IOException::class)
fun parse(xcp: XContentParser, id: String = NO_ID, version: Long = NO_VERSION): CorrelationAlert {
// Parse additional CorrelationAlert-specific fields
val correlatedFindingIds: MutableList<String> = mutableListOf()
var correlationRuleId: String? = null
var correlationRuleName: String? = null
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

when (fieldName) {
CORRELATED_FINDING_IDS -> {
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_ARRAY) {
correlatedFindingIds.add(xcp.text())
}
}
CORRELATION_RULE_ID -> correlationRuleId = xcp.text()
CORRELATION_RULE_NAME -> correlationRuleName = xcp.text()
}
}

val unifiedAlert = parse(xcp, version)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which parse() method is called? Recursive?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, so in this case, the parse function of CorrelationAlert is utilizing the parse function of its superclass BaseAlert to handle the parsing of fields common to both types of alerts. Then, it proceeds to parse the additional fields specific to CorrelationAlert

return CorrelationAlert(
correlatedFindingIds = correlatedFindingIds,
correlationRuleId = requireNotNull(correlationRuleId),
correlationRuleName = requireNotNull(correlationRuleName),
id = requireNotNull(unifiedAlert.id),
version = requireNotNull(unifiedAlert.version),
schemaVersion = requireNotNull(unifiedAlert.schemaVersion),
user = unifiedAlert.user,
triggerName = requireNotNull(unifiedAlert.triggerName),
state = requireNotNull(unifiedAlert.state),
startTime = requireNotNull(unifiedAlert.startTime),
endTime = unifiedAlert.endTime,
acknowledgedTime = unifiedAlert.acknowledgedTime,
errorMessage = unifiedAlert.errorMessage,
severity = requireNotNull(unifiedAlert.severity),
actionExecutionResults = unifiedAlert.actionExecutionResults
)
}
}
}
Loading
Loading