Skip to content

Commit

Permalink
changes to support generic inputs and triggers in remote monitors (#664)
Browse files Browse the repository at this point in the history
Signed-off-by: Subhobrata Dey <[email protected]>
  • Loading branch information
sbcd90 authored and jowg-amazon committed Jul 2, 2024
1 parent 2f63b00 commit 0b5281c
Show file tree
Hide file tree
Showing 9 changed files with 335 additions and 9 deletions.
10 changes: 8 additions & 2 deletions src/main/kotlin/org/opensearch/commons/alerting/model/Input.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package org.opensearch.commons.alerting.model
import org.opensearch.commons.alerting.model.ClusterMetricsInput.Companion.URI_FIELD
import org.opensearch.commons.alerting.model.DocLevelMonitorInput.Companion.DOC_LEVEL_INPUT_FIELD
import org.opensearch.commons.alerting.model.SearchInput.Companion.SEARCH_FIELD
import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorInput
import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorInput.Companion.REMOTE_MONITOR_INPUT_FIELD
import org.opensearch.commons.notifications.model.BaseModel
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.xcontent.XContentParser
Expand All @@ -14,7 +16,8 @@ interface Input : BaseModel {
enum class Type(val value: String) {
DOCUMENT_LEVEL_INPUT(DOC_LEVEL_INPUT_FIELD),
CLUSTER_METRICS_INPUT(URI_FIELD),
SEARCH_INPUT(SEARCH_FIELD);
SEARCH_INPUT(SEARCH_FIELD),
REMOTE_MONITOR_INPUT(REMOTE_MONITOR_INPUT_FIELD);

override fun toString(): String {
return value
Expand All @@ -32,8 +35,10 @@ interface Input : BaseModel {
SearchInput.parseInner(xcp)
} else if (xcp.currentName() == Type.CLUSTER_METRICS_INPUT.value) {
ClusterMetricsInput.parseInner(xcp)
} else {
} else if (xcp.currentName() == Type.DOCUMENT_LEVEL_INPUT.value) {
DocLevelMonitorInput.parse(xcp)
} else {
RemoteMonitorInput.parse(xcp)
}
XContentParserUtils.ensureExpectedToken(XContentParser.Token.END_OBJECT, xcp.nextToken(), xcp)
return input
Expand All @@ -46,6 +51,7 @@ interface Input : BaseModel {
Type.DOCUMENT_LEVEL_INPUT -> DocLevelMonitorInput(sin)
Type.CLUSTER_METRICS_INPUT -> ClusterMetricsInput(sin)
Type.SEARCH_INPUT -> SearchInput(sin)
Type.REMOTE_MONITOR_INPUT -> RemoteMonitorInput(sin)
// This shouldn't be reachable but ensuring exhaustiveness as Kotlin warns
// enum can be null in Java
else -> throw IllegalStateException("Unexpected input [$type] when reading Trigger")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.opensearch.commons.alerting.model

import org.opensearch.commons.alerting.model.action.Action
import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorTrigger
import org.opensearch.commons.notifications.model.BaseModel
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.xcontent.XContentParser
Expand All @@ -14,7 +15,8 @@ interface Trigger : BaseModel {
QUERY_LEVEL_TRIGGER(QueryLevelTrigger.QUERY_LEVEL_TRIGGER_FIELD),
BUCKET_LEVEL_TRIGGER(BucketLevelTrigger.BUCKET_LEVEL_TRIGGER_FIELD),
NOOP_TRIGGER(NoOpTrigger.NOOP_TRIGGER_FIELD),
CHAINED_ALERT_TRIGGER(ChainedAlertTrigger.CHAINED_ALERT_TRIGGER_FIELD);
CHAINED_ALERT_TRIGGER(ChainedAlertTrigger.CHAINED_ALERT_TRIGGER_FIELD),
REMOTE_MONITOR_TRIGGER(RemoteMonitorTrigger.REMOTE_MONITOR_TRIGGER_FIELD);

override fun toString(): String {
return value
Expand Down Expand Up @@ -55,6 +57,7 @@ interface Trigger : BaseModel {
Type.BUCKET_LEVEL_TRIGGER -> BucketLevelTrigger(sin)
Type.DOCUMENT_LEVEL_TRIGGER -> DocumentLevelTrigger(sin)
Type.CHAINED_ALERT_TRIGGER -> ChainedAlertTrigger(sin)
Type.REMOTE_MONITOR_TRIGGER -> RemoteMonitorTrigger(sin)
// This shouldn't be reachable but ensuring exhaustiveness as Kotlin warns
// enum can be null in Java
else -> throw IllegalStateException("Unexpected input [$type] when reading Trigger")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package org.opensearch.commons.alerting.model.remote.monitors

import org.opensearch.commons.alerting.model.Input
import org.opensearch.core.common.bytes.BytesReference
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParserUtils
import java.io.IOException
import java.nio.ByteBuffer

data class RemoteMonitorInput(val input: BytesReference) : Input {

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
sin.readBytesReference()
)

fun asTemplateArg(): Map<String, Any?> {
val bytes = input.toBytesRef().bytes
return mapOf(
INPUT_SIZE to bytes.size,
INPUT_FIELD to bytes
)
}

override fun name(): String {
return REMOTE_MONITOR_INPUT_FIELD
}

override fun writeTo(out: StreamOutput) {
out.writeBytesReference(input)
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
val bytes = input.toBytesRef().bytes
return builder.startObject()
.startObject(REMOTE_MONITOR_INPUT_FIELD)
.field(INPUT_SIZE, bytes.size)
.field(INPUT_FIELD, bytes)
.endObject()
.endObject()
}

companion object {
const val INPUT_FIELD = "input"
const val INPUT_SIZE = "size"
const val REMOTE_MONITOR_INPUT_FIELD = "remote_monitor_input"

fun parse(xcp: XContentParser): RemoteMonitorInput {
var bytes: ByteArray? = null
var size: Int = 0

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

when (fieldName) {
INPUT_FIELD -> bytes = xcp.binaryValue()
INPUT_SIZE -> size = xcp.intValue()
}
}
val input = BytesReference.fromByteBuffer(ByteBuffer.wrap(bytes, 0, size))
return RemoteMonitorInput(input)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package org.opensearch.commons.alerting.model.remote.monitors

import org.opensearch.common.CheckedFunction
import org.opensearch.common.UUIDs
import org.opensearch.commons.alerting.model.Trigger
import org.opensearch.commons.alerting.model.action.Action
import org.opensearch.core.ParseField
import org.opensearch.core.common.bytes.BytesReference
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParserUtils
import java.io.IOException
import java.nio.ByteBuffer

data class RemoteMonitorTrigger(
override val id: String,
override val name: String,
override val severity: String,
override val actions: List<Action>,
val trigger: BytesReference
) : Trigger {
@Throws(IOException::class)
constructor(sin: StreamInput) : this(
sin.readString(),
sin.readString(),
sin.readString(),
sin.readList(::Action),
sin.readBytesReference()
)

fun asTemplateArg(): Map<String, Any?> {
val bytes = trigger.toBytesRef().bytes
return mapOf(
Trigger.ID_FIELD to id,
Trigger.NAME_FIELD to name,
Trigger.SEVERITY_FIELD to severity,
Trigger.ACTIONS_FIELD to actions.map { it.asTemplateArg() },
TRIGGER_SIZE to bytes.size,
TRIGGER_FIELD to bytes
)
}

override fun name(): String {
return REMOTE_MONITOR_TRIGGER_FIELD
}

override fun writeTo(out: StreamOutput) {
out.writeString(id)
out.writeString(name)
out.writeString(severity)
out.writeCollection(actions)
out.writeBytesReference(trigger)
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
val bytes = trigger.toBytesRef().bytes
return builder.startObject()
.startObject(REMOTE_MONITOR_TRIGGER_FIELD)
.field(Trigger.ID_FIELD, id)
.field(Trigger.NAME_FIELD, name)
.field(Trigger.SEVERITY_FIELD, severity)
.field(Trigger.ACTIONS_FIELD, actions.toTypedArray())
.field(TRIGGER_SIZE, bytes.size)
.field(TRIGGER_FIELD, bytes)
.endObject()
.endObject()
}

companion object {
const val TRIGGER_FIELD = "trigger"
const val TRIGGER_SIZE = "size"
const val REMOTE_MONITOR_TRIGGER_FIELD = "remote_monitor_trigger"

val XCONTENT_REGISTRY = NamedXContentRegistry.Entry(
Trigger::class.java,
ParseField(REMOTE_MONITOR_TRIGGER_FIELD),
CheckedFunction { parseInner(it) }
)

fun parseInner(xcp: XContentParser): RemoteMonitorTrigger {
var id = UUIDs.base64UUID() // assign a default triggerId if one is not specified
lateinit var name: String
lateinit var severity: String
val actions: MutableList<Action> = mutableListOf()
var bytes: ByteArray? = null
var size: Int = 0

if (xcp.currentToken() != XContentParser.Token.START_OBJECT && xcp.currentToken() != XContentParser.Token.FIELD_NAME) {
XContentParserUtils.throwUnknownToken(xcp.currentToken(), xcp.tokenLocation)
}

// If the parser began on START_OBJECT, move to the next token so that the while loop enters on
// the fieldName (or END_OBJECT if it's empty).
if (xcp.currentToken() == XContentParser.Token.START_OBJECT) xcp.nextToken()
while (xcp.currentToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

when (fieldName) {
Trigger.ID_FIELD -> id = xcp.text()
Trigger.NAME_FIELD -> name = xcp.text()
Trigger.SEVERITY_FIELD -> severity = xcp.text()
Trigger.ACTIONS_FIELD -> {
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_ARRAY,
xcp.currentToken(),
xcp
)
while (xcp.nextToken() != XContentParser.Token.END_ARRAY) {
actions.add(Action.parse(xcp))
}
}
TRIGGER_FIELD -> bytes = xcp.binaryValue()
TRIGGER_SIZE -> size = xcp.intValue()
}
xcp.nextToken()
}
val trigger = BytesReference.fromByteBuffer(ByteBuffer.wrap(bytes, 0, size))
return RemoteMonitorTrigger(id, name, severity, actions, trigger)
}
}
}
10 changes: 9 additions & 1 deletion src/test/kotlin/org/opensearch/commons/alerting/TestHelpers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import org.opensearch.commons.alerting.model.action.AlertCategory
import org.opensearch.commons.alerting.model.action.PerAlertActionScope
import org.opensearch.commons.alerting.model.action.PerExecutionActionScope
import org.opensearch.commons.alerting.model.action.Throttle
import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorTrigger
import org.opensearch.commons.alerting.util.getBucketKeysHash
import org.opensearch.commons.alerting.util.string
import org.opensearch.commons.authuser.User
Expand Down Expand Up @@ -514,6 +515,12 @@ fun parser(xc: String): XContentParser {
return parser
}

fun parser(xc: ByteArray): XContentParser {
val parser = XContentType.JSON.xContent().createParser(xContentRegistry(), LoggingDeprecationHandler.INSTANCE, xc)
parser.nextToken()
return parser
}

fun xContentRegistry(): NamedXContentRegistry {
return NamedXContentRegistry(
listOf(
Expand All @@ -523,7 +530,8 @@ fun xContentRegistry(): NamedXContentRegistry {
BucketLevelTrigger.XCONTENT_REGISTRY,
DocumentLevelTrigger.XCONTENT_REGISTRY,
ChainedAlertTrigger.XCONTENT_REGISTRY,
NoOpTrigger.XCONTENT_REGISTRY
NoOpTrigger.XCONTENT_REGISTRY,
RemoteMonitorTrigger.XCONTENT_REGISTRY
) + SearchModule(Settings.EMPTY, emptyList()).namedXContents
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.commons.alerting.action

import org.junit.Assert.assertEquals
import org.junit.jupiter.api.Test
import org.opensearch.common.io.stream.BytesStreamOutput
import org.opensearch.commons.alerting.model.ActionExecutionTime
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
Expand All @@ -21,13 +23,13 @@ import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.index.shard.ShardId
import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.script.Script
import org.opensearch.test.OpenSearchTestCase
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.UUID

class DocLevelMonitorFanOutRequestTests : OpenSearchTestCase() {
class DocLevelMonitorFanOutRequestTests {

@Test
fun `test doc level monitor fan out request as stream`() {
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", fields = listOf(), name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf("test-index"), listOf(docQuery))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@

package org.opensearch.commons.alerting.action

import org.junit.Assert.assertEquals
import org.junit.jupiter.api.Test
import org.opensearch.common.io.stream.BytesStreamOutput
import org.opensearch.commons.alerting.model.InputRunResults
import org.opensearch.commons.alerting.randomDocumentLevelTriggerRunResult
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.test.OpenSearchTestCase

class DocLevelMonitorFanOutResponseTests : OpenSearchTestCase() {
class DocLevelMonitorFanOutResponseTests {

@Test
fun `test doc level monitor fan out response with errors as stream`() {
val docLevelMonitorFanOutResponse = DocLevelMonitorFanOutResponse(
"nodeid",
Expand All @@ -33,6 +36,7 @@ class DocLevelMonitorFanOutResponseTests : OpenSearchTestCase() {
assertEquals(docLevelMonitorFanOutResponse.triggerResults, newDocLevelMonitorFanOutResponse.triggerResults)
}

@Test
fun `test doc level monitor fan out response as stream`() {
val workflow = DocLevelMonitorFanOutResponse(
"nodeid",
Expand Down
Loading

0 comments on commit 0b5281c

Please sign in to comment.