From d03fa89cbbafd8ec91efcdff30cdc66f21aed773 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Wed, 12 Jul 2023 11:17:54 -0700 Subject: [PATCH] add support for fetching workflows in search monitors api Signed-off-by: Surya Sashank Nistala --- .../transport/TransportSearchMonitorAction.kt | 5 +++- .../alerting/AlertingRestTestCase.kt | 28 +++++++++++++------ .../alerting/resthandler/WorkflowRestApiIT.kt | 16 +++++++++-- 3 files changed, 38 insertions(+), 11 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchMonitorAction.kt index 7f2a5c88f..1f2e7403c 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchMonitorAction.kt @@ -23,6 +23,7 @@ import org.opensearch.common.inject.Inject import org.opensearch.common.settings.Settings import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.ScheduledJob +import org.opensearch.commons.alerting.model.Workflow import org.opensearch.commons.authuser.User import org.opensearch.index.query.BoolQueryBuilder import org.opensearch.index.query.ExistsQueryBuilder @@ -60,7 +61,9 @@ class TransportSearchMonitorAction @Inject constructor( // When querying the ALL_ALERT_INDEX_PATTERN, we don't want to check whether the MONITOR_TYPE field exists // because we're querying alert indexes. if (searchMonitorRequest.searchRequest.indices().contains(ScheduledJob.SCHEDULED_JOBS_INDEX)) { - queryBuilder.filter(QueryBuilders.existsQuery(Monitor.MONITOR_TYPE)) + val monitorWorkflowType = QueryBuilders.boolQuery().should(QueryBuilders.existsQuery(Monitor.MONITOR_TYPE)) + .should(QueryBuilders.existsQuery(Workflow.WORKFLOW_TYPE)) + queryBuilder.must(monitorWorkflowType) } searchSourceBuilder.query(queryBuilder) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt index 47250f92b..77772483f 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt @@ -41,7 +41,6 @@ import org.opensearch.common.xcontent.XContentFactory import org.opensearch.common.xcontent.XContentFactory.jsonBuilder import org.opensearch.common.xcontent.XContentParserUtils import org.opensearch.common.xcontent.XContentType -import org.opensearch.common.xcontent.json.JsonXContent import org.opensearch.common.xcontent.json.JsonXContent.jsonXContent import org.opensearch.commons.alerting.action.GetFindingsResponse import org.opensearch.commons.alerting.model.Alert @@ -645,9 +644,9 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { val httpResponse = adminClient().makeRequest("GET", "/$indices/_search", StringEntity(request, APPLICATION_JSON)) assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus()) - val searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content)) + val searchResponse = SearchResponse.fromXContent(createParser(jsonXContent, httpResponse.entity.content)) return searchResponse.hits.hits.map { - val xcp = createParser(JsonXContent.jsonXContent, it.sourceRef).also { it.nextToken() } + val xcp = createParser(jsonXContent, it.sourceRef).also { it.nextToken() } Alert.parse(xcp, it.id, it.version) }.filter { alert -> alert.monitorId == monitor.id } } @@ -690,9 +689,9 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { val httpResponse = adminClient().makeRequest("GET", "/$indices/_search", StringEntity(request, APPLICATION_JSON)) assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus()) - val searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content)) + val searchResponse = SearchResponse.fromXContent(createParser(jsonXContent, httpResponse.entity.content)) return searchResponse.hits.hits.map { - val xcp = createParser(JsonXContent.jsonXContent, it.sourceRef).also { it.nextToken() } + val xcp = createParser(jsonXContent, it.sourceRef).also { it.nextToken() } Finding.parse(xcp) }.filter { finding -> finding.monitorId == monitor.id } } @@ -715,9 +714,9 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { val httpResponse = adminClient().makeRequest("GET", "/$indices/_search", searchParams, StringEntity(request, APPLICATION_JSON)) assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus()) - val searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content)) + val searchResponse = SearchResponse.fromXContent(createParser(jsonXContent, httpResponse.entity.content)) return searchResponse.hits.hits.map { - val xcp = createParser(JsonXContent.jsonXContent, it.sourceRef).also { it.nextToken() } + val xcp = createParser(jsonXContent, it.sourceRef).also { it.nextToken() } Alert.parse(xcp, it.id, it.version) } } @@ -865,6 +864,19 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { return GetFindingsResponse(response.restStatus(), totalFindings, findings) } + protected fun searchMonitors(): SearchResponse { + var baseEndpoint = "${AlertingPlugin.MONITOR_BASE_URI}/_search?" + val request = """ + { "version" : true, + "query": { "match_all": {} } + } + """.trimIndent() + val httpResponse = adminClient().makeRequest("POST", baseEndpoint, StringEntity(request, APPLICATION_JSON)) + assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus()) + return SearchResponse.fromXContent(createParser(jsonXContent, httpResponse.entity.content)) + + } + protected fun indexDoc(index: String, id: String, doc: String, refresh: Boolean = true): Response { return indexDoc(client(), index, id, doc, refresh) } @@ -1582,7 +1594,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { ) assertEquals("Unable to create a new monitor", RestStatus.CREATED, response.restStatus()) - val workflowJson = JsonXContent.jsonXContent.createParser( + val workflowJson = jsonXContent.createParser( NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, response.entity.content ).map() diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt index 45c17bc02..7ca3b7329 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt @@ -41,6 +41,7 @@ import java.time.temporal.ChronoUnit import java.util.Collections import java.util.Locale import java.util.UUID +import java.util.stream.Collectors @TestLogging("level:DEBUG", reason = "Debug for tests.") @Suppress("UNCHECKED_CAST") @@ -1069,7 +1070,7 @@ class WorkflowRestApiIT : AlertingRestTestCase() { owner = "alerting", triggers = listOf(andTrigger) ) - val workflowById = createWorkflow(workflow)!! + val workflowById = createWorkflow(workflow) assertNotNull(workflowById) val workflowId = workflowById.id @@ -1079,7 +1080,18 @@ class WorkflowRestApiIT : AlertingRestTestCase() { "test_value_1" ) ) - + val searchMonitorResponse = searchMonitors() + logger.error(searchMonitorResponse) + val jobsList = searchMonitorResponse.hits.toList() + var numMonitors = 0 + var numWorkflows = 0 + jobsList.forEach { + val map = it.sourceAsMap + if(map["type"] == "workflow") numWorkflows++ + else if(map["type"] == "monitor") numMonitors++ + } + Assert.assertEquals(numMonitors, 2) + Assert.assertEquals(numWorkflows, 1) val response = executeWorkflow(workflowId = workflowId, params = emptyMap()) val executeWorkflowResponse = entityAsMap(response) logger.info(executeWorkflowResponse)