Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.9] Backport 2.x - add auditDelegateMonitorAlerts flag (#476) #478

Merged
merged 1 commit into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class GetWorkflowAlertsRequest : ActionRequest {
val severityLevel: String
val alertState: String
val alertIndex: String?
val associatedAlertsIndex: String?
val monitorIds: List<String>?
val workflowIds: List<String>?
val alertIds: List<String>?
Expand All @@ -22,15 +23,17 @@ class GetWorkflowAlertsRequest : ActionRequest {
severityLevel: String,
alertState: String,
alertIndex: String?,
associatedAlertsIndex: String?,
monitorIds: List<String>? = null,
workflowIds: List<String>? = null,
alertIds: List<String>? = null,
getAssociatedAlerts: Boolean
getAssociatedAlerts: Boolean,
) : super() {
this.table = table
this.severityLevel = severityLevel
this.alertState = alertState
this.alertIndex = alertIndex
this.associatedAlertsIndex = associatedAlertsIndex
this.monitorIds = monitorIds
this.workflowIds = workflowIds
this.alertIds = alertIds
Expand All @@ -43,6 +46,7 @@ class GetWorkflowAlertsRequest : ActionRequest {
severityLevel = sin.readString(),
alertState = sin.readString(),
alertIndex = sin.readOptionalString(),
associatedAlertsIndex = sin.readOptionalString(),
monitorIds = sin.readOptionalStringList(),
workflowIds = sin.readOptionalStringList(),
alertIds = sin.readOptionalStringList(),
Expand All @@ -59,6 +63,7 @@ class GetWorkflowAlertsRequest : ActionRequest {
out.writeString(severityLevel)
out.writeString(alertState)
out.writeOptionalString(alertIndex)
out.writeOptionalString(associatedAlertsIndex)
out.writeOptionalStringCollection(monitorIds)
out.writeOptionalStringCollection(workflowIds)
out.writeOptionalStringCollection(alertIds)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ data class Alert(
) : Writeable, ToXContent {

init {
if (errorMessage != null) require(state == State.DELETED || state == State.ERROR) {
if (errorMessage != null) require(state == State.DELETED || state == State.ERROR || state == State.AUDIT) {
"Attempt to create an alert with an error in state: $state"
}
}
Expand Down Expand Up @@ -421,7 +421,9 @@ data class Alert(
SCHEMA_VERSION_FIELD -> schemaVersion = xcp.intValue()
MONITOR_NAME_FIELD -> monitorName = xcp.text()
MONITOR_VERSION_FIELD -> monitorVersion = xcp.longValue()
MONITOR_USER_FIELD -> monitorUser = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else User.parse(xcp)
MONITOR_USER_FIELD ->
monitorUser = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null
else User.parse(xcp)
TRIGGER_ID_FIELD -> triggerId = xcp.text()
FINDING_IDS -> {
ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp)
Expand Down
18 changes: 14 additions & 4 deletions src/main/kotlin/org/opensearch/commons/alerting/model/Workflow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ data class Workflow(
val schemaVersion: Int = NO_SCHEMA_VERSION,
val inputs: List<WorkflowInput>,
val owner: String? = DEFAULT_OWNER,
val triggers: List<Trigger>
val triggers: List<Trigger>,
val auditDelegateMonitorAlerts: Boolean? = true,
) : ScheduledJob {
override val type = WORKFLOW_TYPE

Expand Down Expand Up @@ -70,7 +71,8 @@ data class Workflow(
schemaVersion = sin.readInt(),
inputs = sin.readList((WorkflowInput)::readFrom),
owner = sin.readOptionalString(),
triggers = sin.readList((Trigger)::readFrom)
triggers = sin.readList((Trigger)::readFrom),
auditDelegateMonitorAlerts = sin.readOptionalBoolean()
)

// This enum classifies different workflows
Expand Down Expand Up @@ -99,7 +101,7 @@ data class Workflow(
private fun createXContentBuilder(
builder: XContentBuilder,
params: ToXContent.Params,
secure: Boolean
secure: Boolean,
): XContentBuilder {
builder.startObject()
if (params.paramAsBoolean("with_type", false)) builder.startObject(type)
Expand All @@ -119,6 +121,9 @@ data class Workflow(
.field(TRIGGERS_FIELD, triggers.toTypedArray())
.optionalTimeField(LAST_UPDATE_TIME_FIELD, lastUpdateTime)
builder.field(OWNER_FIELD, owner)
if (auditDelegateMonitorAlerts != null) {
builder.field(AUDIT_DELEGATE_MONITOR_ALERTS_FIELD, auditDelegateMonitorAlerts)
}
if (params.paramAsBoolean("with_type", false)) builder.endObject()
return builder.endObject()
}
Expand Down Expand Up @@ -159,6 +164,7 @@ data class Workflow(
}
it.writeTo(out)
}
out.writeOptionalBoolean(auditDelegateMonitorAlerts)
}

companion object {
Expand All @@ -177,6 +183,7 @@ data class Workflow(
const val ENABLED_TIME_FIELD = "enabled_time"
const val TRIGGERS_FIELD = "triggers"
const val OWNER_FIELD = "owner"
const val AUDIT_DELEGATE_MONITOR_ALERTS_FIELD = "audit_delegate_monitor_alerts"

// This is defined here instead of in ScheduledJob to avoid having the ScheduledJob class know about all
// the different subclasses and creating circular dependencies
Expand All @@ -201,6 +208,7 @@ data class Workflow(
val inputs: MutableList<WorkflowInput> = mutableListOf()
val triggers: MutableList<Trigger> = mutableListOf()
var owner = DEFAULT_OWNER
var auditDelegateMonitorAlerts = true

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
Expand Down Expand Up @@ -245,6 +253,7 @@ data class Workflow(
}
ENABLED_TIME_FIELD -> enabledTime = xcp.instant()
LAST_UPDATE_TIME_FIELD -> lastUpdateTime = xcp.instant()
AUDIT_DELEGATE_MONITOR_ALERTS_FIELD -> auditDelegateMonitorAlerts = xcp.booleanValue()
OWNER_FIELD -> {
owner = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) owner else xcp.text()
}
Expand Down Expand Up @@ -272,7 +281,8 @@ data class Workflow(
schemaVersion,
inputs.toList(),
owner,
triggers
triggers,
auditDelegateMonitorAlerts
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ fun randomWorkflow(
enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null,
lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS),
triggers: List<Trigger> = listOf(randomChainedAlertTrigger()),
auditDelegateMonitorAlerts: Boolean? = true
): Workflow {
val delegates = mutableListOf<Delegate>()
if (!monitorIds.isNullOrEmpty()) {
Expand All @@ -195,7 +196,7 @@ fun randomWorkflow(
return Workflow(
name = name, workflowType = Workflow.WorkflowType.COMPOSITE, enabled = enabled, inputs = input,
schedule = schedule, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user,
triggers = triggers
triggers = triggers, auditDelegateMonitorAlerts = auditDelegateMonitorAlerts
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ internal class GetWorkflowAlertsRequestTests {
workflowIds = listOf("w1", "w2"),
alertIds = emptyList(),
alertIndex = null,
associatedAlertsIndex = null,
monitorIds = emptyList()
)
assertNotNull(req)
Expand All @@ -41,6 +42,42 @@ internal class GetWorkflowAlertsRequestTests {
assertTrue(newReq.alertIds!!.isEmpty())
assertTrue(newReq.monitorIds!!.isEmpty())
assertNull(newReq.alertIndex)
assertNull(newReq.associatedAlertsIndex)
assertTrue(newReq.getAssociatedAlerts)
}

@Test
fun `test get alerts request with custom alerts and associated alerts indices`() {

val table = Table("asc", "sortString", null, 1, 0, "")

val req = GetWorkflowAlertsRequest(
table = table,
severityLevel = "1",
alertState = "active",
getAssociatedAlerts = true,
workflowIds = listOf("w1", "w2"),
alertIds = emptyList(),
alertIndex = "alertIndex",
associatedAlertsIndex = "associatedAlertsIndex",
monitorIds = emptyList()
)
assertNotNull(req)

val out = BytesStreamOutput()
req.writeTo(out)
val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes)
val newReq = GetWorkflowAlertsRequest(sin)

assertEquals("1", newReq.severityLevel)
assertEquals("active", newReq.alertState)
assertEquals(table, newReq.table)
assertTrue(newReq.workflowIds!!.contains("w1"))
assertTrue(newReq.workflowIds!!.contains("w2"))
assertTrue(newReq.alertIds!!.isEmpty())
assertTrue(newReq.monitorIds!!.isEmpty())
assertEquals(newReq.alertIndex, "alertIndex")
assertEquals(newReq.associatedAlertsIndex, "associatedAlertsIndex")
assertTrue(newReq.getAssociatedAlerts)
}

Expand All @@ -55,7 +92,8 @@ internal class GetWorkflowAlertsRequestTests {
getAssociatedAlerts = true,
workflowIds = listOf("w1, w2"),
alertIds = emptyList(),
alertIndex = null
alertIndex = null,
associatedAlertsIndex = null
)
assertNotNull(req)
assertNull(req.validate())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package org.opensearch.commons.alerting.action

import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
import org.opensearch.common.io.stream.BytesStreamOutput
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.commons.alerting.model.CompositeInput
import org.opensearch.commons.alerting.model.IntervalSchedule
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.commons.alerting.randomDelegate
import org.opensearch.commons.alerting.randomUser
import org.opensearch.commons.alerting.randomWorkflow
import org.opensearch.rest.RestStatus
import java.time.Instant
import java.time.temporal.ChronoUnit

class GetWorkflowResponseTests {

@Test
fun testGetWorkflowResponse() {
val workflow = randomWorkflow(auditDelegateMonitorAlerts = false)
val response = GetWorkflowResponse(
id = "id", version = 1, seqNo = 1, primaryTerm = 1, status = RestStatus.OK, workflow = workflow
)
val out = BytesStreamOutput()
response.writeTo(out)
val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes)
val newRes = GetWorkflowResponse(sin)
Assertions.assertEquals("id", newRes.id)
Assertions.assertFalse(newRes.workflow!!.auditDelegateMonitorAlerts!!)
Assertions.assertEquals(workflow.name, newRes.workflow!!.name)
Assertions.assertEquals(workflow.owner, newRes.workflow!!.owner)
}

@Test
fun testGetWorkflowResponseWhereAuditDelegateMonitorAlertsFlagIsNotSet() {
val workflow = Workflow(
id = "",
version = Workflow.NO_VERSION,
name = "test",
enabled = true,
schemaVersion = 2,
schedule = IntervalSchedule(1, ChronoUnit.MINUTES),
lastUpdateTime = Instant.now(),
enabledTime = Instant.now(),
workflowType = Workflow.WorkflowType.COMPOSITE,
user = randomUser(),
inputs = listOf(CompositeInput(org.opensearch.commons.alerting.model.Sequence(listOf(randomDelegate())))),
owner = "",
triggers = listOf()
)
val response = GetWorkflowResponse(
id = "id", version = 1, seqNo = 1, primaryTerm = 1, status = RestStatus.OK, workflow = workflow
)
val out = BytesStreamOutput()
response.writeTo(out)
val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes)
val newRes = GetWorkflowResponse(sin)
Assertions.assertEquals("id", newRes.id)
Assertions.assertTrue(newRes.workflow!!.auditDelegateMonitorAlerts!!)
Assertions.assertEquals(workflow.name, newRes.workflow!!.name)
Assertions.assertEquals(workflow.owner, newRes.workflow!!.owner)
Assertions.assertEquals(workflow.auditDelegateMonitorAlerts, newRes.workflow!!.auditDelegateMonitorAlerts)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class IndexWorkflowRequestTests {

val req = IndexWorkflowRequest(
"1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.POST,
randomWorkflow()
randomWorkflow(auditDelegateMonitorAlerts = false)
)
Assertions.assertNotNull(req)

Expand All @@ -42,6 +42,7 @@ class IndexWorkflowRequestTests {
Assertions.assertEquals(2L, newReq.primaryTerm)
Assertions.assertEquals(RestRequest.Method.POST, newReq.method)
Assertions.assertNotNull(newReq.workflow)
Assertions.assertFalse(newReq.workflow.auditDelegateMonitorAlerts!!)
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,14 @@ class XContentTests {
Assertions.assertEquals(workflow, parsedMonitor, "Round tripping BucketLevelMonitor doesn't work")
}

@Test
fun `test composite workflow parsing with auditDelegateMonitorAlerts flag disabled`() {
val workflow = randomWorkflow(auditDelegateMonitorAlerts = false)
val monitorString = workflow.toJsonStringWithUser()
val parsedMonitor = Workflow.parse(parser(monitorString))
Assertions.assertEquals(workflow, parsedMonitor, "Round tripping BucketLevelMonitor doesn't work")
}

@Test
fun `test query-level trigger parsing`() {
val trigger = randomQueryLevelTrigger()
Expand Down