diff --git a/src/main/kotlin/org/opensearch/commons/alerting/AlertingPluginInterface.kt b/src/main/kotlin/org/opensearch/commons/alerting/AlertingPluginInterface.kt index 5949582b..3ce81671 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/AlertingPluginInterface.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/AlertingPluginInterface.kt @@ -4,6 +4,7 @@ */ package org.opensearch.commons.alerting +import org.opensearch.action.search.SearchResponse import org.opensearch.client.node.NodeClient import org.opensearch.commons.alerting.action.AcknowledgeAlertRequest import org.opensearch.commons.alerting.action.AcknowledgeAlertResponse @@ -17,6 +18,8 @@ import org.opensearch.commons.alerting.action.GetAlertsRequest import org.opensearch.commons.alerting.action.GetAlertsResponse import org.opensearch.commons.alerting.action.GetFindingsRequest import org.opensearch.commons.alerting.action.GetFindingsResponse +import org.opensearch.commons.alerting.action.GetMonitorRequest +import org.opensearch.commons.alerting.action.GetMonitorResponse import org.opensearch.commons.alerting.action.GetWorkflowAlertsRequest import org.opensearch.commons.alerting.action.GetWorkflowAlertsResponse import org.opensearch.commons.alerting.action.GetWorkflowRequest @@ -26,6 +29,7 @@ import org.opensearch.commons.alerting.action.IndexMonitorResponse import org.opensearch.commons.alerting.action.IndexWorkflowRequest import org.opensearch.commons.alerting.action.IndexWorkflowResponse import org.opensearch.commons.alerting.action.PublishFindingsRequest +import org.opensearch.commons.alerting.action.SearchMonitorRequest import org.opensearch.commons.alerting.action.SubscribeFindingsResponse import org.opensearch.commons.notifications.action.BaseResponse import org.opensearch.commons.utils.recreateObject @@ -288,6 +292,51 @@ object AlertingPluginInterface { ) } + /** + * Get Monitor interface. + * @param client Node client for making transport action + * @param request The request object + * @param listener The listener for getting response + */ + fun getMonitor( + client: NodeClient, + request: GetMonitorRequest, + listener: ActionListener + ) { + client.execute( + AlertingActions.GET_MONITOR_ACTION_TYPE, + request, + wrapActionListener(listener) { response -> + recreateObject(response) { + GetMonitorResponse( + it + ) + } + } + ) + } + + /** + * Search Monitors interface. + * @param client Node client for making transport action + * @param request The request object + * @param listener The listener for getting response + */ + fun searchMonitors( + client: NodeClient, + request: SearchMonitorRequest, + listener: ActionListener + ) { + client.execute( + AlertingActions.SEARCH_MONITORS_ACTION_TYPE, + request, + // we do not use the wrapActionListener in this case since there is no need + // to recreate any object or specially handle onResponse / onFailure. It is + // simply returning a SearchResponse. + listener + ) + } + @Suppress("UNCHECKED_CAST") private fun wrapActionListener( listener: ActionListener, diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/AlertingActions.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/AlertingActions.kt index c2bae396..f4e68d73 100644 --- a/src/main/kotlin/org/opensearch/commons/alerting/action/AlertingActions.kt +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/AlertingActions.kt @@ -5,6 +5,7 @@ package org.opensearch.commons.alerting.action import org.opensearch.action.ActionType +import org.opensearch.action.search.SearchResponse object AlertingActions { const val INDEX_MONITOR_ACTION_NAME = "cluster:admin/opendistro/alerting/monitor/write" @@ -18,6 +19,8 @@ object AlertingActions { const val ACKNOWLEDGE_ALERTS_ACTION_NAME = "cluster:admin/opendistro/alerting/alerts/ack" const val ACKNOWLEDGE_CHAINED_ALERTS_ACTION_NAME = "cluster:admin/opendistro/alerting/chained_alerts/ack" const val SUBSCRIBE_FINDINGS_ACTION_NAME = "cluster:admin/opensearch/alerting/findings/subscribe" + const val GET_MONITOR_ACTION_NAME = "cluster:admin/opendistro/alerting/monitor/get" + const val SEARCH_MONITORS_ACTION_NAME = "cluster:admin/opendistro/alerting/monitor/search" @JvmField val INDEX_MONITOR_ACTION_TYPE = @@ -60,4 +63,12 @@ object AlertingActions { @JvmField val ACKNOWLEDGE_CHAINED_ALERTS_ACTION_TYPE = ActionType(ACKNOWLEDGE_CHAINED_ALERTS_ACTION_NAME, ::AcknowledgeAlertResponse) + + @JvmField + val GET_MONITOR_ACTION_TYPE = + ActionType(GET_MONITOR_ACTION_NAME, ::GetMonitorResponse) + + @JvmField + val SEARCH_MONITORS_ACTION_TYPE = + ActionType(SEARCH_MONITORS_ACTION_NAME, ::SearchResponse) } diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/GetMonitorRequest.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/GetMonitorRequest.kt new file mode 100644 index 00000000..6c1df281 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/GetMonitorRequest.kt @@ -0,0 +1,56 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.action + +import org.opensearch.action.ActionRequest +import org.opensearch.action.ActionRequestValidationException +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.rest.RestRequest +import org.opensearch.search.fetch.subphase.FetchSourceContext +import java.io.IOException + +class GetMonitorRequest : ActionRequest { + val monitorId: String + val version: Long + val method: RestRequest.Method + val srcContext: FetchSourceContext? + + constructor( + monitorId: String, + version: Long, + method: RestRequest.Method, + srcContext: FetchSourceContext? + ) : super() { + this.monitorId = monitorId + this.version = version + this.method = method + this.srcContext = srcContext + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readString(), // monitorId + sin.readLong(), // version + sin.readEnum(RestRequest.Method::class.java), // method + if (sin.readBoolean()) { + FetchSourceContext(sin) // srcContext + } else null + ) + + override fun validate(): ActionRequestValidationException? { + return null + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(monitorId) + out.writeLong(version) + out.writeEnum(method) + out.writeBoolean(srcContext != null) + srcContext?.writeTo(out) + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/GetMonitorResponse.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/GetMonitorResponse.kt new file mode 100644 index 00000000..49903853 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/GetMonitorResponse.kt @@ -0,0 +1,126 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.action + +import org.opensearch.commons.alerting.model.Monitor +import org.opensearch.commons.alerting.util.IndexUtils.Companion._ID +import org.opensearch.commons.alerting.util.IndexUtils.Companion._PRIMARY_TERM +import org.opensearch.commons.alerting.util.IndexUtils.Companion._SEQ_NO +import org.opensearch.commons.alerting.util.IndexUtils.Companion._VERSION +import org.opensearch.commons.notifications.action.BaseResponse +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.ToXContentFragment +import org.opensearch.core.xcontent.XContentBuilder +import java.io.IOException + +class GetMonitorResponse : BaseResponse { + var id: String + var version: Long + var seqNo: Long + var primaryTerm: Long + var monitor: Monitor? + var associatedWorkflows: List? + + constructor( + id: String, + version: Long, + seqNo: Long, + primaryTerm: Long, + monitor: Monitor?, + associatedCompositeMonitors: List?, + ) : super() { + this.id = id + this.version = version + this.seqNo = seqNo + this.primaryTerm = primaryTerm + this.monitor = monitor + this.associatedWorkflows = associatedCompositeMonitors ?: emptyList() + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + id = sin.readString(), // id + version = sin.readLong(), // version + seqNo = sin.readLong(), // seqNo + primaryTerm = sin.readLong(), // primaryTerm + monitor = if (sin.readBoolean()) { + Monitor.readFrom(sin) // monitor + } else null, + associatedCompositeMonitors = sin.readList((AssociatedWorkflow)::readFrom), + ) + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeString(id) + out.writeLong(version) + out.writeLong(seqNo) + out.writeLong(primaryTerm) + if (monitor != null) { + out.writeBoolean(true) + monitor?.writeTo(out) + } else { + out.writeBoolean(false) + } + associatedWorkflows?.forEach { + it.writeTo(out) + } + } + + @Throws(IOException::class) + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + .field(_ID, id) + .field(_VERSION, version) + .field(_SEQ_NO, seqNo) + .field(_PRIMARY_TERM, primaryTerm) + if (monitor != null) { + builder.field("monitor", monitor) + } + if (associatedWorkflows != null) { + builder.field("associated_workflows", associatedWorkflows!!.toTypedArray()) + } + return builder.endObject() + } + + class AssociatedWorkflow : ToXContentFragment { + val id: String + val name: String + + constructor(id: String, name: String) { + this.id = id + this.name = name + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params?): XContentBuilder { + builder.startObject() + .field("id", id) + .field("name", name) + .endObject() + return builder + } + + fun writeTo(out: StreamOutput) { + out.writeString(id) + out.writeString(name) + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readString(), + sin.readString() + ) + + companion object { + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): AssociatedWorkflow { + return AssociatedWorkflow(sin) + } + } + } +} diff --git a/src/main/kotlin/org/opensearch/commons/alerting/action/SearchMonitorRequest.kt b/src/main/kotlin/org/opensearch/commons/alerting/action/SearchMonitorRequest.kt new file mode 100644 index 00000000..003d3316 --- /dev/null +++ b/src/main/kotlin/org/opensearch/commons/alerting/action/SearchMonitorRequest.kt @@ -0,0 +1,38 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.action + +import org.opensearch.action.ActionRequest +import org.opensearch.action.ActionRequestValidationException +import org.opensearch.action.search.SearchRequest +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import java.io.IOException + +class SearchMonitorRequest : ActionRequest { + + val searchRequest: SearchRequest + + constructor( + searchRequest: SearchRequest + ) : super() { + this.searchRequest = searchRequest + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + searchRequest = SearchRequest(sin) + ) + + override fun validate(): ActionRequestValidationException? { + return null + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + searchRequest.writeTo(out) + } +} diff --git a/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt index b4c39766..3ce4f6bb 100644 --- a/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt +++ b/src/test/kotlin/org/opensearch/commons/alerting/AlertingPluginInterfaceTests.kt @@ -10,6 +10,7 @@ import org.mockito.Mockito import org.mockito.Mockito.mock import org.mockito.junit.jupiter.MockitoExtension import org.opensearch.action.ActionType +import org.opensearch.action.search.SearchResponse import org.opensearch.client.node.NodeClient import org.opensearch.common.settings.Settings import org.opensearch.commons.alerting.action.AcknowledgeAlertRequest @@ -23,6 +24,8 @@ import org.opensearch.commons.alerting.action.GetAlertsRequest import org.opensearch.commons.alerting.action.GetAlertsResponse import org.opensearch.commons.alerting.action.GetFindingsRequest import org.opensearch.commons.alerting.action.GetFindingsResponse +import org.opensearch.commons.alerting.action.GetMonitorRequest +import org.opensearch.commons.alerting.action.GetMonitorResponse import org.opensearch.commons.alerting.action.GetWorkflowAlertsRequest import org.opensearch.commons.alerting.action.GetWorkflowAlertsResponse import org.opensearch.commons.alerting.action.GetWorkflowRequest @@ -32,6 +35,7 @@ import org.opensearch.commons.alerting.action.IndexMonitorResponse import org.opensearch.commons.alerting.action.IndexWorkflowRequest import org.opensearch.commons.alerting.action.IndexWorkflowResponse import org.opensearch.commons.alerting.action.PublishFindingsRequest +import org.opensearch.commons.alerting.action.SearchMonitorRequest import org.opensearch.commons.alerting.action.SubscribeFindingsResponse import org.opensearch.commons.alerting.model.FindingDocument import org.opensearch.commons.alerting.model.FindingWithDocs @@ -269,4 +273,32 @@ internal class AlertingPluginInterfaceTests { AlertingPluginInterface.acknowledgeChainedAlerts(client, request, listener) Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response)) } + + @Test + fun getMonitor() { + val request = mock(GetMonitorRequest::class.java) + val response = GetMonitorResponse("test-id", 1, 1, 1, null, null) + val listener: ActionListener = + mock(ActionListener::class.java) as ActionListener + Mockito.doAnswer { + (it.getArgument(2) as ActionListener) + .onResponse(response) + }.whenever(client).execute(Mockito.any(ActionType::class.java), Mockito.any(), Mockito.any()) + AlertingPluginInterface.getMonitor(client, request, listener) + Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response)) + } + + @Test + fun searchMonitors() { + val request = mock(SearchMonitorRequest::class.java) + val response = mock(SearchResponse::class.java) + val listener: ActionListener = + mock(ActionListener::class.java) as ActionListener + Mockito.doAnswer { + (it.getArgument(2) as ActionListener) + .onResponse(response) + }.whenever(client).execute(Mockito.any(ActionType::class.java), Mockito.any(), Mockito.any()) + AlertingPluginInterface.searchMonitors(client, request, listener) + Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response)) + } } diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/GetMonitorRequestTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/GetMonitorRequestTests.kt new file mode 100644 index 00000000..c916b993 --- /dev/null +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/GetMonitorRequestTests.kt @@ -0,0 +1,60 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.action + +import org.opensearch.common.io.stream.BytesStreamOutput +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.rest.RestRequest +import org.opensearch.search.fetch.subphase.FetchSourceContext +import org.opensearch.test.OpenSearchTestCase + +class GetMonitorRequestTests : OpenSearchTestCase() { + + fun `test get monitor request`() { + + val req = GetMonitorRequest("1234", 1L, RestRequest.Method.GET, FetchSourceContext.FETCH_SOURCE) + assertNotNull(req) + + val out = BytesStreamOutput() + req.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newReq = GetMonitorRequest(sin) + assertEquals("1234", newReq.monitorId) + assertEquals(1L, newReq.version) + assertEquals(RestRequest.Method.GET, newReq.method) + assertEquals(FetchSourceContext.FETCH_SOURCE, newReq.srcContext) + } + + fun `test get monitor request without src context`() { + + val req = GetMonitorRequest("1234", 1L, RestRequest.Method.GET, null) + assertNotNull(req) + + val out = BytesStreamOutput() + req.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newReq = GetMonitorRequest(sin) + assertEquals("1234", newReq.monitorId) + assertEquals(1L, newReq.version) + assertEquals(RestRequest.Method.GET, newReq.method) + assertEquals(null, newReq.srcContext) + } + + fun `test head monitor request`() { + + val req = GetMonitorRequest("1234", 2L, RestRequest.Method.HEAD, FetchSourceContext.FETCH_SOURCE) + assertNotNull(req) + + val out = BytesStreamOutput() + req.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newReq = GetMonitorRequest(sin) + assertEquals("1234", newReq.monitorId) + assertEquals(2L, newReq.version) + assertEquals(RestRequest.Method.HEAD, newReq.method) + assertEquals(FetchSourceContext.FETCH_SOURCE, newReq.srcContext) + } +} diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/GetMonitorResponseTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/GetMonitorResponseTests.kt new file mode 100644 index 00000000..d91c7471 --- /dev/null +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/GetMonitorResponseTests.kt @@ -0,0 +1,63 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.action + +import org.opensearch.common.io.stream.BytesStreamOutput +import org.opensearch.commons.alerting.model.CronSchedule +import org.opensearch.commons.alerting.model.Monitor +import org.opensearch.commons.alerting.randomUser +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.test.OpenSearchTestCase +import java.time.Instant +import java.time.ZoneId + +class GetMonitorResponseTests : OpenSearchTestCase() { + + fun `test get monitor response`() { + val req = GetMonitorResponse("1234", 1L, 2L, 0L, null, null) + assertNotNull(req) + + val out = BytesStreamOutput() + req.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newReq = GetMonitorResponse(sin) + assertEquals("1234", newReq.id) + assertEquals(1L, newReq.version) + assertEquals(null, newReq.monitor) + } + + fun `test get monitor response with monitor`() { + val cronExpression = "31 * * * *" // Run at minute 31. + val testInstance = Instant.ofEpochSecond(1538164858L) + + val cronSchedule = CronSchedule(cronExpression, ZoneId.of("Asia/Kolkata"), testInstance) + val monitor = Monitor( + id = "123", + version = 0L, + name = "test-monitor", + enabled = true, + schedule = cronSchedule, + lastUpdateTime = Instant.now(), + enabledTime = Instant.now(), + monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR, + user = randomUser(), + schemaVersion = 0, + inputs = mutableListOf(), + triggers = mutableListOf(), + uiMetadata = mutableMapOf() + ) + val req = GetMonitorResponse("1234", 1L, 2L, 0L, monitor, null) + assertNotNull(req) + + val out = BytesStreamOutput() + req.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newReq = GetMonitorResponse(sin) + assertEquals("1234", newReq.id) + assertEquals(1L, newReq.version) + assertNotNull(newReq.monitor) + } +} diff --git a/src/test/kotlin/org/opensearch/commons/alerting/action/SearchMonitorRequestTests.kt b/src/test/kotlin/org/opensearch/commons/alerting/action/SearchMonitorRequestTests.kt new file mode 100644 index 00000000..169814ea --- /dev/null +++ b/src/test/kotlin/org/opensearch/commons/alerting/action/SearchMonitorRequestTests.kt @@ -0,0 +1,33 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.commons.alerting.action + +import org.opensearch.action.search.SearchRequest +import org.opensearch.common.io.stream.BytesStreamOutput +import org.opensearch.common.unit.TimeValue +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.test.OpenSearchTestCase +import org.opensearch.test.rest.OpenSearchRestTestCase +import java.util.concurrent.TimeUnit + +class SearchMonitorRequestTests : OpenSearchTestCase() { + + fun `test search monitors request`() { + val searchSourceBuilder = SearchSourceBuilder().from(0).size(100).timeout(TimeValue(60, TimeUnit.SECONDS)) + val searchRequest = SearchRequest().indices(OpenSearchRestTestCase.randomAlphaOfLength(10)).source(searchSourceBuilder) + val searchMonitorRequest = SearchMonitorRequest(searchRequest) + assertNotNull(searchMonitorRequest) + + val out = BytesStreamOutput() + searchMonitorRequest.writeTo(out) + val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) + val newReq = SearchMonitorRequest(sin) + + assertNotNull(newReq.searchRequest) + assertEquals(1, newReq.searchRequest.indices().size) + } +}