Skip to content

Commit

Permalink
add support for fetching workflows in search monitors api
Browse files Browse the repository at this point in the history
Signed-off-by: Surya Sashank Nistala <[email protected]>
  • Loading branch information
eirsep committed Jul 12, 2023
1 parent 0ba330f commit d03fa89
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.opensearch.common.inject.Inject
import org.opensearch.common.settings.Settings
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.commons.authuser.User
import org.opensearch.index.query.BoolQueryBuilder
import org.opensearch.index.query.ExistsQueryBuilder
Expand Down Expand Up @@ -60,7 +61,9 @@ class TransportSearchMonitorAction @Inject constructor(
// When querying the ALL_ALERT_INDEX_PATTERN, we don't want to check whether the MONITOR_TYPE field exists
// because we're querying alert indexes.
if (searchMonitorRequest.searchRequest.indices().contains(ScheduledJob.SCHEDULED_JOBS_INDEX)) {
queryBuilder.filter(QueryBuilders.existsQuery(Monitor.MONITOR_TYPE))
val monitorWorkflowType = QueryBuilders.boolQuery().should(QueryBuilders.existsQuery(Monitor.MONITOR_TYPE))
.should(QueryBuilders.existsQuery(Workflow.WORKFLOW_TYPE))
queryBuilder.must(monitorWorkflowType)
}

searchSourceBuilder.query(queryBuilder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentFactory.jsonBuilder
import org.opensearch.common.xcontent.XContentParserUtils
import org.opensearch.common.xcontent.XContentType
import org.opensearch.common.xcontent.json.JsonXContent
import org.opensearch.common.xcontent.json.JsonXContent.jsonXContent
import org.opensearch.commons.alerting.action.GetFindingsResponse
import org.opensearch.commons.alerting.model.Alert
Expand Down Expand Up @@ -645,9 +644,9 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
val httpResponse = adminClient().makeRequest("GET", "/$indices/_search", StringEntity(request, APPLICATION_JSON))
assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus())

val searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content))
val searchResponse = SearchResponse.fromXContent(createParser(jsonXContent, httpResponse.entity.content))
return searchResponse.hits.hits.map {
val xcp = createParser(JsonXContent.jsonXContent, it.sourceRef).also { it.nextToken() }
val xcp = createParser(jsonXContent, it.sourceRef).also { it.nextToken() }
Alert.parse(xcp, it.id, it.version)
}.filter { alert -> alert.monitorId == monitor.id }
}
Expand Down Expand Up @@ -690,9 +689,9 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
val httpResponse = adminClient().makeRequest("GET", "/$indices/_search", StringEntity(request, APPLICATION_JSON))
assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus())

val searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content))
val searchResponse = SearchResponse.fromXContent(createParser(jsonXContent, httpResponse.entity.content))
return searchResponse.hits.hits.map {
val xcp = createParser(JsonXContent.jsonXContent, it.sourceRef).also { it.nextToken() }
val xcp = createParser(jsonXContent, it.sourceRef).also { it.nextToken() }
Finding.parse(xcp)
}.filter { finding -> finding.monitorId == monitor.id }
}
Expand All @@ -715,9 +714,9 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
val httpResponse = adminClient().makeRequest("GET", "/$indices/_search", searchParams, StringEntity(request, APPLICATION_JSON))
assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus())

val searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content))
val searchResponse = SearchResponse.fromXContent(createParser(jsonXContent, httpResponse.entity.content))
return searchResponse.hits.hits.map {
val xcp = createParser(JsonXContent.jsonXContent, it.sourceRef).also { it.nextToken() }
val xcp = createParser(jsonXContent, it.sourceRef).also { it.nextToken() }
Alert.parse(xcp, it.id, it.version)
}
}
Expand Down Expand Up @@ -865,6 +864,19 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
return GetFindingsResponse(response.restStatus(), totalFindings, findings)
}

protected fun searchMonitors(): SearchResponse {
var baseEndpoint = "${AlertingPlugin.MONITOR_BASE_URI}/_search?"
val request = """
{ "version" : true,
"query": { "match_all": {} }
}
""".trimIndent()
val httpResponse = adminClient().makeRequest("POST", baseEndpoint, StringEntity(request, APPLICATION_JSON))
assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus())
return SearchResponse.fromXContent(createParser(jsonXContent, httpResponse.entity.content))

}

protected fun indexDoc(index: String, id: String, doc: String, refresh: Boolean = true): Response {
return indexDoc(client(), index, id, doc, refresh)
}
Expand Down Expand Up @@ -1582,7 +1594,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
)
assertEquals("Unable to create a new monitor", RestStatus.CREATED, response.restStatus())

val workflowJson = JsonXContent.jsonXContent.createParser(
val workflowJson = jsonXContent.createParser(
NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE,
response.entity.content
).map()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import java.time.temporal.ChronoUnit
import java.util.Collections
import java.util.Locale
import java.util.UUID
import java.util.stream.Collectors

@TestLogging("level:DEBUG", reason = "Debug for tests.")
@Suppress("UNCHECKED_CAST")
Expand Down Expand Up @@ -1069,7 +1070,7 @@ class WorkflowRestApiIT : AlertingRestTestCase() {
owner = "alerting",
triggers = listOf(andTrigger)
)
val workflowById = createWorkflow(workflow)!!
val workflowById = createWorkflow(workflow)
assertNotNull(workflowById)
val workflowId = workflowById.id

Expand All @@ -1079,7 +1080,18 @@ class WorkflowRestApiIT : AlertingRestTestCase() {
"test_value_1"
)
)

val searchMonitorResponse = searchMonitors()
logger.error(searchMonitorResponse)
val jobsList = searchMonitorResponse.hits.toList()
var numMonitors = 0
var numWorkflows = 0
jobsList.forEach {
val map = it.sourceAsMap
if(map["type"] == "workflow") numWorkflows++
else if(map["type"] == "monitor") numMonitors++
}
Assert.assertEquals(numMonitors, 2)
Assert.assertEquals(numWorkflows, 1)
val response = executeWorkflow(workflowId = workflowId, params = emptyMap())
val executeWorkflowResponse = entityAsMap(response)
logger.info(executeWorkflowResponse)
Expand Down

0 comments on commit d03fa89

Please sign in to comment.