Skip to content

Commit

Permalink
percolate query implementation in doc-level alerting
Browse files Browse the repository at this point in the history
Signed-off-by: Subhobrata Dey <[email protected]>
  • Loading branch information
sbcd90 committed Apr 13, 2022
1 parent 5f4d9fd commit fb8dee8
Show file tree
Hide file tree
Showing 10 changed files with 492 additions and 11 deletions.
1 change: 1 addition & 0 deletions alerting/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ configurations.testImplementation {

dependencies {
compileOnly "org.opensearch.plugin:opensearch-scripting-painless-spi:${versions.opensearch}"
api "org.opensearch.plugin:percolator-client:${opensearch_version}"

// OpenSearch Nanny state
implementation "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.opensearch.alerting.action.SearchEmailGroupAction
import org.opensearch.alerting.action.SearchMonitorAction
import org.opensearch.alerting.aggregation.bucketselectorext.BucketSelectorExtAggregationBuilder
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.core.DocLevelMonitorQueries
import org.opensearch.alerting.core.JobSweeper
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.alerting.core.action.node.ScheduledJobsStatsAction
Expand Down Expand Up @@ -152,6 +153,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
lateinit var scheduler: JobScheduler
lateinit var sweeper: JobSweeper
lateinit var scheduledJobIndices: ScheduledJobIndices
lateinit var docLevelMonitorQueries: DocLevelMonitorQueries
lateinit var threadPool: ThreadPool
lateinit var alertIndices: AlertIndices
lateinit var clusterService: ClusterService
Expand Down Expand Up @@ -257,11 +259,12 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
.registerConsumers()
.registerDestinationSettings()
scheduledJobIndices = ScheduledJobIndices(client.admin(), clusterService)
docLevelMonitorQueries = DocLevelMonitorQueries(client.admin(), clusterService)
scheduler = JobScheduler(threadPool, runner)
sweeper = JobSweeper(environment.settings(), client, clusterService, threadPool, xContentRegistry, scheduler, ALERTING_JOB_TYPES)
this.threadPool = threadPool
this.clusterService = clusterService
return listOf(sweeper, scheduler, runner, scheduledJobIndices)
return listOf(sweeper, scheduler, runner, scheduledJobIndices, docLevelMonitorQueries)
}

override fun getSettings(): List<Setting<*>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import org.apache.logging.log4j.LogManager
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.search.SearchAction
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.core.model.DocLevelMonitorInput
import org.opensearch.alerting.core.model.DocLevelQuery
import org.opensearch.alerting.core.model.ScheduledJob
import org.opensearch.alerting.elasticapi.string
import org.opensearch.alerting.model.ActionRunResult
import org.opensearch.alerting.model.Alert
Expand All @@ -30,11 +32,14 @@ import org.opensearch.client.Client
import org.opensearch.cluster.routing.ShardRouting
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.Strings
import org.opensearch.common.bytes.BytesReference
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentType
import org.opensearch.index.query.BoolQueryBuilder
import org.opensearch.index.query.QueryBuilders
import org.opensearch.percolator.PercolateQueryBuilder
import org.opensearch.rest.RestStatus
import org.opensearch.search.SearchHits
import org.opensearch.search.builder.SearchSourceBuilder
Expand Down Expand Up @@ -93,22 +98,59 @@ object DocumentReturningMonitorRunner : MonitorRunner {
}
}

val queryToDocIds = mutableMapOf<DocLevelQuery, Set<String>>()
val queryToDocIds = mutableMapOf<DocLevelQuery, MutableSet<String>>()
val docsToQueries = mutableMapOf<String, MutableList<String>>()
val docExecutionContext = DocumentExecutionContext(queries, lastRunContext, updatedLastRunContext)
val idQueryMap = mutableMapOf<String, DocLevelQuery>()
queries.forEach { query ->
val matchingDocIds = runForEachQuery(monitorCtx, docExecutionContext, query, index)

val matchingDocs = getMatchingDocs(monitor, monitorCtx, docExecutionContext, index, dryrun)
/* matchingDocs.forEach {
logger.info(monitor.id + "-" + it.first)
}*/
if (matchingDocs.isNotEmpty()) {
val matchedQueriesForDocs = getMatchedQueries(monitorCtx, matchingDocs.map { it.second }, monitor)
// logger.info(monitor.id + "-" + matchedQueriesForDocs.hits.size)

matchedQueriesForDocs.forEach { hit ->
val (id, query) = Pair(
hit.id.replace("_${monitor.id}", ""),
((hit.sourceAsMap["query"] as HashMap<*, *>)["query_string"] as HashMap<*, *>)["query"]
)
// logger.info("found hit-$id-$query")
val docLevelQuery = DocLevelQuery(id, id, query.toString())

val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() }
docIndices.forEach { idx ->
if (queryToDocIds.containsKey(docLevelQuery)) {
queryToDocIds[docLevelQuery]?.add(matchingDocs[idx].first)
} else {
queryToDocIds[docLevelQuery] = mutableSetOf(matchingDocs[idx].first)
}

if (docsToQueries.containsKey(matchingDocs[idx].first)) {
docsToQueries[matchingDocs[idx].first]?.add(id)
} else {
docsToQueries[matchingDocs[idx].first] = mutableListOf(id)
}
}
}
}

/* queries.forEach { query ->
val matchingDocIds = runForEachQuery(monitor, monitorCtx, docExecutionContext, query, index, dryrun)
queryToDocIds[query] = matchingDocIds
matchingDocIds.forEach {
docsToQueries.putIfAbsent(it, mutableListOf())
docsToQueries[it]?.add(query.id)
}
idQueryMap[query.id] = query
}
}*/
val queryInputResults = queryToDocIds.mapKeys { it.key.id }
monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(queryInputResults)))
val queryIds = queries.map { it.id }
val queryIds = queries.map {
idQueryMap[it.id] = it
it.id
}

val triggerResults = mutableMapOf<String, DocumentLevelTriggerRunResult>()
monitor.triggers.forEach {
Expand Down Expand Up @@ -339,6 +381,56 @@ object DocumentReturningMonitorRunner : MonitorRunner {
return allShards.filter { it.primary() }.size
}

private fun getMatchingDocs(
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
docExecutionCtx: DocumentExecutionContext,
index: String,
dryrun: Boolean
): List<Pair<String, BytesReference>> {
val count: Int = docExecutionCtx.lastRunContext["shards_count"] as Int
val matchingDocs = mutableListOf<Pair<String, BytesReference>>()
for (i: Int in 0 until count) {
val shard = i.toString()
try {
logger.info("Monitor execution for shard: $shard")

val maxSeqNo: Long = docExecutionCtx.updatedLastRunContext[shard].toString().toLong()
logger.info("MaxSeqNo of shard_$shard is $maxSeqNo")

// If dryrun, set the previous sequence number as 1 less than the max sequence number or 0
val prevSeqNo = if (dryrun || monitor.id == Monitor.NO_ID)
max(-1, maxSeqNo - 1)
else docExecutionCtx.lastRunContext[shard].toString().toLongOrNull()

if (dryrun) {
logger.info("it is a dryrun")
}

logger.info("prevSeq: $prevSeqNo, maxSeq: $maxSeqNo")

val hits: SearchHits = searchShard(
monitorCtx,
index,
shard,
prevSeqNo,
maxSeqNo,
null
)
logger.info("Search hits for shard_$shard is: ${hits.hits.size}")

if (hits.hits.isNotEmpty()) {
// logger.info("found matches")
matchingDocs.addAll(getAllDocs(hits, monitor.id))
}
} catch (e: Exception) {
logger.info("Failed to run for shard $shard. Error: ${e.message}")
logger.debug("Failed to run for shard $shard", e)
}
}
return matchingDocs
}

private fun runForEachQuery(
monitorCtx: MonitorRunnerExecutionContext,
docExecutionCtx: DocumentExecutionContext,
Expand Down Expand Up @@ -384,14 +476,17 @@ object DocumentReturningMonitorRunner : MonitorRunner {
shard: String,
prevSeqNo: Long?,
maxSeqNo: Long,
query: String
query: String?
): SearchHits {
if (prevSeqNo?.equals(maxSeqNo) == true && maxSeqNo != 0L) {
return SearchHits.empty()
}
val boolQueryBuilder = BoolQueryBuilder()
boolQueryBuilder.filter(QueryBuilders.rangeQuery("_seq_no").gt(prevSeqNo).lte(maxSeqNo))
boolQueryBuilder.must(QueryBuilders.queryStringQuery(query))

if (query != null) {
boolQueryBuilder.must(QueryBuilders.queryStringQuery(query))
}

val request: SearchRequest = SearchRequest()
.indices(index)
Expand All @@ -410,7 +505,47 @@ object DocumentReturningMonitorRunner : MonitorRunner {
return response.hits
}

private fun getMatchedQueries(
monitorCtx: MonitorRunnerExecutionContext,
docs: List<BytesReference>,
monitor: Monitor
): SearchHits {
val percolateQueryBuilder = PercolateQueryBuilder("query", docs, XContentType.JSON)

val searchRequest = SearchRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX)
val searchSourceBuilder = SearchSourceBuilder()
searchSourceBuilder.query(percolateQueryBuilder)
searchRequest.source(searchSourceBuilder)

return if (monitorCtx.clusterService!!.state().routingTable.hasIndex(ScheduledJob.DOC_LEVEL_QUERIES_INDEX)) {
val response: SearchResponse = monitorCtx.client!!.execute(SearchAction.INSTANCE, searchRequest).actionGet()

if (response.status() !== RestStatus.OK) {
throw IOException("Failed to search percolate index: ${ScheduledJob.DOC_LEVEL_QUERIES_INDEX}")
}
response.hits
} else {
SearchHits.empty()
}
}

private fun getAllDocIds(hits: SearchHits): List<String> {
return hits.map { hit -> hit.id }
}

private fun getAllDocs(hits: SearchHits, monitorId: String): List<Pair<String, BytesReference>> {
return hits.map { hit ->
val sourceMap = hit.sourceAsMap

var xContentBuilder = XContentFactory.jsonBuilder().startObject()
sourceMap.forEach { (k, v) ->
xContentBuilder = xContentBuilder.field("${k}_$monitorId", v)
}
xContentBuilder = xContentBuilder.endObject()

val sourceRef = BytesReference.bytes(xContentBuilder)

Pair(hit.id, sourceRef)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.authuser.User
import org.opensearch.index.query.QueryBuilders
import org.opensearch.index.reindex.BulkByScrollResponse
import org.opensearch.index.reindex.DeleteByQueryAction
import org.opensearch.index.reindex.DeleteByQueryRequestBuilder
import org.opensearch.rest.RestStatus
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
Expand Down Expand Up @@ -132,6 +136,10 @@ class TransportDeleteMonitorAction @Inject constructor(
deleteRequest,
object : ActionListener<DeleteResponse> {
override fun onResponse(response: DeleteResponse) {
val clusterState = clusterService.state()
if (clusterState.routingTable.hasIndex(ScheduledJob.DOC_LEVEL_QUERIES_INDEX)) {
deleteDocLevelMonitorQueries()
}
actionListener.onResponse(response)
}

Expand All @@ -141,5 +149,20 @@ class TransportDeleteMonitorAction @Inject constructor(
}
)
}

private fun deleteDocLevelMonitorQueries() {
DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE)
.source(ScheduledJob.DOC_LEVEL_QUERIES_INDEX)
.filter(QueryBuilders.matchQuery("monitor_id", monitorId))
.execute(
object : ActionListener<BulkByScrollResponse> {
override fun onResponse(response: BulkByScrollResponse) {
}

override fun onFailure(t: Exception) {
}
}
)
}
}
}
Loading

0 comments on commit fb8dee8

Please sign in to comment.