Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support list of monitor ids in Chained Monitor Findings #514

Merged
merged 4 commits into from
Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class IndexWorkflowRequest : ActionRequest {
refreshPolicy: WriteRequest.RefreshPolicy,
method: RestRequest.Method,
workflow: Workflow,
rbacRoles: List<String>? = null
rbacRoles: List<String>? = null,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the , required?

Copy link
Member Author

@eirsep eirsep Aug 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reverted

) : super() {
this.workflowId = workflowId
this.seqNo = seqNo
Expand Down Expand Up @@ -105,19 +105,41 @@ class IndexWorkflowRequest : ActionRequest {
val monitorIdOrderMap: Map<String, Int> = delegates.associate { it.monitorId to it.order }
delegates.forEach {
if (it.chainedMonitorFindings != null) {
if (monitorIdOrderMap.containsKey(it.chainedMonitorFindings.monitorId) == false) {
validationException = ValidateActions.addValidationError(
"Chained Findings Monitor ${it.chainedMonitorFindings.monitorId} doesn't exist in sequence",
validationException
)
// Break the flow because next check will generate the NPE
return validationException
}
if (it.order <= monitorIdOrderMap[it.chainedMonitorFindings.monitorId]!!) {
validationException = ValidateActions.addValidationError(
"Chained Findings Monitor ${it.chainedMonitorFindings.monitorId} should be executed before monitor ${it.monitorId}",
validationException
)

if (it.chainedMonitorFindings.monitorId != null) {
if (monitorIdOrderMap.containsKey(it.chainedMonitorFindings.monitorId) == false) {
validationException = ValidateActions.addValidationError(
"Chained Findings Monitor ${it.chainedMonitorFindings.monitorId} doesn't exist in sequence",
validationException
)
// Break the flow because next check will generate the NPE
return validationException
}
if (it.order <= monitorIdOrderMap[it.chainedMonitorFindings.monitorId]!!) {
validationException = ValidateActions.addValidationError(
"Chained Findings Monitor ${it.chainedMonitorFindings.monitorId} should be executed before monitor ${it.monitorId}",
validationException
)
}
} else {
for (monitorId in it.chainedMonitorFindings.monitorIds) {
if (!monitorIdOrderMap.containsKey(monitorId)) {
validationException = ValidateActions.addValidationError(
"Chained Findings Monitor $monitorId doesn't exist in sequence",
validationException
)
return validationException
} else {
val order = monitorIdOrderMap.get(monitorId)!!
if (order >= it.order) {
return ValidateActions.addValidationError(
"Chained Findings Monitor ${it.chainedMonitorFindings.monitorId} should be executed before monitor ${it.monitorId}. " +
"Order of monitor being chained [$order] should be smaller than order of monitor using findings as source data [${it.order}] in sequence",
validationException
)
}
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,63 +9,89 @@ import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParserUtils
import java.io.IOException
import java.util.Collections

/**
* Context passed in delegate monitor to filter data queried by a monitor based on the findings of the given monitor id.
* Context passed in delegate monitor to filter data matched by a list of monitors based on the findings of the given monitor ids.
*/
// TODO - Remove the class and move the monitorId to Delegate (as a chainedMonitorId property) if this class won't be updated by adding new properties
data class ChainedMonitorFindings(
val monitorId: String
val monitorId: String? = null,
val monitorIds: List<String> = emptyList(), // if monitorId field is non-null it would be given precendence for BWC
) : BaseModel {

init {
validateId(monitorId)
require(!(monitorId.isNullOrBlank() && monitorIds.isEmpty())) {
"at least one of fields, 'monitorIds' and 'monitorId' should be provided"
}
if (monitorId != null && monitorId.isBlank()) {
validateId(monitorId)
} else {
monitorIds.forEach { validateId(it) }
}
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
sin.readString(), // monitorId
sin.readOptionalString(), // monitorId
Collections.unmodifiableList(sin.readStringList())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use kotlin toList here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any specific reason to use toList?
Collections.unmodifiableList has been used across the code base to wrap string lists read from stream input

)

@Suppress("UNCHECKED_CAST")
fun asTemplateArg(): Map<String, Any> {
return mapOf(
MONITOR_ID_FIELD to monitorId,
)
MONITOR_IDS_FIELD to monitorIds
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible we merge these 2 fields to 1? it looks little complex that we have both

MONITOR_ID_FIELD to monitorId,
MONITOR_IDS_FIELD to monitorIds```

Copy link
Member Author

@eirsep eirsep Aug 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to have both for backward compatibility.

) as Map<String, Any>
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeString(monitorId)
out.writeOptionalString(monitorId)
out.writeStringCollection(monitorIds)
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
.field(MONITOR_ID_FIELD, monitorId)
.field(MONITOR_IDS_FIELD, monitorIds)
.endObject()
return builder
}

companion object {
const val MONITOR_ID_FIELD = "monitor_id"
const val MONITOR_IDS_FIELD = "monitor_ids"

@JvmStatic
@Throws(IOException::class)
fun parse(xcp: XContentParser): ChainedMonitorFindings {
lateinit var monitorId: String

var monitorId: String? = null
val monitorIds = mutableListOf<String>()
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

when (fieldName) {
MONITOR_ID_FIELD -> {
monitorId = xcp.text()
validateId(monitorId)
if (!xcp.currentToken().equals(XContentParser.Token.VALUE_NULL))
monitorId = xcp.text()
}

MONITOR_IDS_FIELD -> {
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_ARRAY,
xcp.currentToken(),
xcp
)
while (xcp.nextToken() != XContentParser.Token.END_ARRAY) {
monitorIds.add(xcp.text())
}
}
}
}
return ChainedMonitorFindings(monitorId)
return ChainedMonitorFindings(monitorId, monitorIds)
}

@JvmStatic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,11 @@ fun randomClusterMetricsInput(
return ClusterMetricsInput(path, pathParams, url)
}

fun ChainedMonitorFindings.toJsonString(): String {
val builder = XContentFactory.jsonBuilder()
return this.toXContent(builder, ToXContent.EMPTY_PARAMS).string()
}

fun Workflow.toJsonString(): String {
val builder = XContentFactory.jsonBuilder()
return this.toXContentWithUser(builder, ToXContent.EMPTY_PARAMS).string()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import org.opensearch.search.SearchModule
import java.lang.Exception
import java.lang.IllegalArgumentException
import java.util.UUID
import kotlin.test.assertNotNull
import kotlin.test.assertNull
import kotlin.test.assertTrue

class IndexWorkflowRequestTests {

Expand Down Expand Up @@ -196,6 +199,21 @@ class IndexWorkflowRequestTests {
delegates = listOf(
Delegate(1, "monitor-1")
)

// Chained finding list of monitors valid
delegates = listOf(
Delegate(1, "monitor-1"),
Delegate(2, "monitor-2"),
Delegate(3, "monitor-3", ChainedMonitorFindings(null, listOf("monitor-1", "monitor-2"))),

)
val req7 = IndexWorkflowRequest(
"1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT,
randomWorkflowWithDelegates(
input = listOf(CompositeInput(Sequence(delegates = delegates)))
)
)
assertNull(req7.validate())
try {
IndexWorkflowRequest(
"1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT,
Expand All @@ -207,5 +225,21 @@ class IndexWorkflowRequestTests {
Assert.assertTrue(ex is IllegalArgumentException)
Assert.assertTrue(ex.message!!.contains("Workflows can only have 1 search input."))
}

// Chained finding list of monitors invalid order and old field null
delegates = listOf(
Delegate(1, "monitor-1"),
Delegate(3, "monitor-2"),
Delegate(2, "monitor-3", ChainedMonitorFindings(null, listOf("monitor-1", "monitor-2"))),

)
val req8 = IndexWorkflowRequest(
"1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT,
randomWorkflowWithDelegates(
input = listOf(CompositeInput(Sequence(delegates = delegates)))
)
)
assertNotNull(req8.validate())
assertTrue(req8.validate()!!.message!!.contains("should be executed before monitor"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,25 @@ class CompositeInputTests {
}

@Test
fun `test create Chained Findings with illegal monitorId value`() {
fun `test create Chained Findings with illegal monitorId value and empty monitorIds list`() {
try {
ChainedMonitorFindings("")
Assertions.fail("Expecting an illegal argument exception")
} catch (e: IllegalArgumentException) {
e.message?.let {
Assertions.assertTrue(
it.contains("at least one of fields, 'monitorIds' and 'monitorId' should be provided")

)
}
}
}

@Test
fun `test create Chained Findings with null monitorId value and monitorIds list with blank monitorIds`() {
try {
ChainedMonitorFindings("", listOf("", ""))
Assertions.fail("Expecting an illegal argument exception")
} catch (e: IllegalArgumentException) {
e.message?.let {
Assertions.assertTrue(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,12 +267,27 @@ class XContentTests {
@Test
fun `test workflow parsing`() {
val workflow = randomWorkflow(monitorIds = listOf("1", "2", "3"))

val monitorString = workflow.toJsonString()
val parsedWorkflow = Workflow.parse(parser(monitorString))
Assertions.assertEquals(workflow, parsedWorkflow, "Round tripping workflow failed")
}

@Test
fun `test chainedMonitorFindings parsing`() {
val cmf1 = ChainedMonitorFindings(monitorId = "m1")
val cmf1String = cmf1.toJsonString()
Assertions.assertEquals(
ChainedMonitorFindings.parse(parser(cmf1String)), cmf1,
"Round tripping chained monitor findings failed"
)
val cmf2 = ChainedMonitorFindings(monitorIds = listOf("m1", "m2"))
val cmf2String = cmf2.toJsonString()
Assertions.assertEquals(
ChainedMonitorFindings.parse(parser(cmf2String)), cmf2,
"Round tripping chained monitor findings failed"
)
}

@Test
fun `test old monitor format parsing`() {
val monitorString = """
Expand Down