-
Notifications
You must be signed in to change notification settings - Fork 105
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Percolate Query implementation in doc-level alerting #399
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
/* | ||
* SPDX-License-Identifier: Apache-2.0 | ||
* | ||
* The OpenSearch Contributors require contributions made to | ||
* this file be licensed under the Apache-2.0 license or a | ||
* compatible open source license. | ||
*/ | ||
|
||
/* | ||
* Licensed to Elasticsearch under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
/* | ||
* Modifications Copyright OpenSearch Contributors. See | ||
* GitHub history for details. | ||
*/ | ||
|
||
package org.opensearch.percolator; | ||
|
||
import org.opensearch.common.settings.Setting; | ||
import org.opensearch.index.mapper.Mapper; | ||
import org.opensearch.plugins.ExtensiblePlugin; | ||
import org.opensearch.plugins.MapperPlugin; | ||
import org.opensearch.plugins.Plugin; | ||
import org.opensearch.plugins.SearchPlugin; | ||
import org.opensearch.search.fetch.FetchSubPhase; | ||
|
||
import java.util.Arrays; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
import static java.util.Collections.singletonList; | ||
import static java.util.Collections.singletonMap; | ||
|
||
/** | ||
* This is a stop-gap solution & will be removed in future. | ||
*/ | ||
// TODO for cleanup | ||
public class PercolatorPluginExt extends Plugin implements MapperPlugin, SearchPlugin, ExtensiblePlugin { | ||
@Override | ||
public List<QuerySpec<?>> getQueries() { | ||
return singletonList(new QuerySpec<>(PercolateQueryBuilderExt.NAME, PercolateQueryBuilderExt::new, PercolateQueryBuilderExt::fromXContent)); | ||
} | ||
|
||
@Override | ||
public List<FetchSubPhase> getFetchSubPhases(FetchPhaseConstructionContext context) { | ||
return Arrays.asList(new PercolatorMatchedSlotSubFetchPhase(), new PercolatorHighlightSubFetchPhase(context.getHighlighters())); | ||
} | ||
|
||
@Override | ||
public List<Setting<?>> getSettings() { | ||
return Arrays.asList(PercolatorFieldMapperExt.INDEX_MAP_UNMAPPED_FIELDS_AS_TEXT_SETTING); | ||
} | ||
|
||
@Override | ||
public Map<String, Mapper.TypeParser> getMappers() { | ||
return singletonMap(PercolatorFieldMapperExt.CONTENT_TYPE, new PercolatorFieldMapperExt.TypeParser()); | ||
} | ||
|
||
@Override | ||
public void loadExtensions(ExtensionLoader loader) { | ||
ExtensiblePlugin.super.loadExtensions(loader); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,14 +1,21 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.alerting | ||
|
||
import kotlinx.coroutines.Dispatchers | ||
import kotlinx.coroutines.withContext | ||
import org.apache.logging.log4j.LogManager | ||
import org.opensearch.action.index.IndexRequest | ||
import org.opensearch.action.search.SearchAction | ||
import org.opensearch.action.search.SearchRequest | ||
import org.opensearch.action.search.SearchResponse | ||
import org.opensearch.action.support.WriteRequest | ||
import org.opensearch.alerting.core.model.DocLevelMonitorInput | ||
import org.opensearch.alerting.core.model.DocLevelQuery | ||
import org.opensearch.alerting.core.model.ScheduledJob | ||
import org.opensearch.alerting.elasticapi.string | ||
import org.opensearch.alerting.model.ActionRunResult | ||
import org.opensearch.alerting.model.Alert | ||
|
@@ -30,11 +37,14 @@ import org.opensearch.client.Client | |
import org.opensearch.cluster.routing.ShardRouting | ||
import org.opensearch.cluster.service.ClusterService | ||
import org.opensearch.common.Strings | ||
import org.opensearch.common.bytes.BytesReference | ||
import org.opensearch.common.xcontent.ToXContent | ||
import org.opensearch.common.xcontent.XContentBuilder | ||
import org.opensearch.common.xcontent.XContentFactory | ||
import org.opensearch.common.xcontent.XContentType | ||
import org.opensearch.index.query.BoolQueryBuilder | ||
import org.opensearch.index.query.QueryBuilders | ||
import org.opensearch.percolator.PercolateQueryBuilderExt | ||
import org.opensearch.rest.RestStatus | ||
import org.opensearch.search.SearchHits | ||
import org.opensearch.search.builder.SearchSourceBuilder | ||
|
@@ -93,22 +103,46 @@ object DocumentReturningMonitorRunner : MonitorRunner { | |
} | ||
} | ||
|
||
val queryToDocIds = mutableMapOf<DocLevelQuery, Set<String>>() | ||
val queryToDocIds = mutableMapOf<DocLevelQuery, MutableSet<String>>() | ||
val docsToQueries = mutableMapOf<String, MutableList<String>>() | ||
val docExecutionContext = DocumentExecutionContext(queries, lastRunContext, updatedLastRunContext) | ||
val idQueryMap = mutableMapOf<String, DocLevelQuery>() | ||
queries.forEach { query -> | ||
val matchingDocIds = runForEachQuery(monitorCtx, docExecutionContext, query, index) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lets cleanup There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. removed it. |
||
queryToDocIds[query] = matchingDocIds | ||
matchingDocIds.forEach { | ||
docsToQueries.putIfAbsent(it, mutableListOf()) | ||
docsToQueries[it]?.add(query.id) | ||
|
||
val matchingDocs = getMatchingDocs(monitor, monitorCtx, docExecutionContext, index, dryrun) | ||
|
||
if (matchingDocs.isNotEmpty()) { | ||
val matchedQueriesForDocs = getMatchedQueries(monitorCtx, matchingDocs.map { it.second }, monitor) | ||
|
||
matchedQueriesForDocs.forEach { hit -> | ||
val (id, query) = Pair( | ||
hit.id.replace("_${monitor.id}", ""), | ||
((hit.sourceAsMap["query"] as HashMap<*, *>)["query_string"] as HashMap<*, *>)["query"] | ||
) | ||
val docLevelQuery = DocLevelQuery(id, id, query.toString()) | ||
|
||
val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() } | ||
docIndices.forEach { idx -> | ||
if (queryToDocIds.containsKey(docLevelQuery)) { | ||
queryToDocIds[docLevelQuery]?.add(matchingDocs[idx].first) | ||
} else { | ||
queryToDocIds[docLevelQuery] = mutableSetOf(matchingDocs[idx].first) | ||
} | ||
|
||
if (docsToQueries.containsKey(matchingDocs[idx].first)) { | ||
docsToQueries[matchingDocs[idx].first]?.add(id) | ||
} else { | ||
docsToQueries[matchingDocs[idx].first] = mutableListOf(id) | ||
} | ||
} | ||
} | ||
idQueryMap[query.id] = query | ||
} | ||
|
||
val queryInputResults = queryToDocIds.mapKeys { it.key.id } | ||
monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(queryInputResults))) | ||
val queryIds = queries.map { it.id } | ||
val queryIds = queries.map { | ||
idQueryMap[it.id] = it | ||
it.id | ||
} | ||
|
||
val triggerResults = mutableMapOf<String, DocumentLevelTriggerRunResult>() | ||
monitor.triggers.forEach { | ||
|
@@ -339,40 +373,49 @@ object DocumentReturningMonitorRunner : MonitorRunner { | |
return allShards.filter { it.primary() }.size | ||
} | ||
|
||
private fun runForEachQuery( | ||
private fun getMatchingDocs( | ||
monitor: Monitor, | ||
monitorCtx: MonitorRunnerExecutionContext, | ||
docExecutionCtx: DocumentExecutionContext, | ||
query: DocLevelQuery, | ||
index: String | ||
): Set<String> { | ||
index: String, | ||
dryrun: Boolean | ||
): List<Pair<String, BytesReference>> { | ||
val count: Int = docExecutionCtx.lastRunContext["shards_count"] as Int | ||
val matchingDocs = mutableSetOf<String>() | ||
val matchingDocs = mutableListOf<Pair<String, BytesReference>>() | ||
for (i: Int in 0 until count) { | ||
val shard = i.toString() | ||
try { | ||
logger.info("Monitor execution for shard: $shard") | ||
|
||
val prevSeqNo = docExecutionCtx.lastRunContext[shard].toString().toLongOrNull() | ||
val maxSeqNo = docExecutionCtx.updatedLastRunContext[shard].toString().toLong() | ||
logger.info("Shard_$shard has MaxSeqNo: $maxSeqNo and PrevSeqNo: $prevSeqNo") | ||
val maxSeqNo: Long = docExecutionCtx.updatedLastRunContext[shard].toString().toLong() | ||
logger.info("MaxSeqNo of shard_$shard is $maxSeqNo") | ||
|
||
// If dryrun, set the previous sequence number as 1 less than the max sequence number or 0 | ||
val prevSeqNo = if (dryrun || monitor.id == Monitor.NO_ID) | ||
max(-1, maxSeqNo - 1) | ||
else docExecutionCtx.lastRunContext[shard].toString().toLongOrNull() | ||
|
||
if (dryrun) { | ||
logger.info("it is a dryrun") | ||
} | ||
|
||
logger.info("prevSeq: $prevSeqNo, maxSeq: $maxSeqNo") | ||
Comment on lines
+393
to
+402
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we simplify this into a single block for dry run and actual run? |
||
|
||
val hits: SearchHits = searchShard( | ||
monitorCtx, | ||
index, | ||
shard, | ||
prevSeqNo, | ||
maxSeqNo, | ||
query.query | ||
null | ||
) | ||
logger.info("Search hits for shard_$shard is: ${hits.hits.size}") | ||
|
||
if (hits.hits.isNotEmpty()) { | ||
logger.info("found matches") | ||
matchingDocs.addAll(getAllDocIds(hits)) | ||
matchingDocs.addAll(getAllDocs(hits, monitor.id)) | ||
} | ||
} catch (e: Exception) { | ||
logger.info("Failed to run for shard $shard. Error: ${e.message}") | ||
logger.debug("Failed to run for shard $shard", e) | ||
} | ||
} | ||
return matchingDocs | ||
|
@@ -384,14 +427,17 @@ object DocumentReturningMonitorRunner : MonitorRunner { | |
shard: String, | ||
prevSeqNo: Long?, | ||
maxSeqNo: Long, | ||
query: String | ||
query: String? | ||
): SearchHits { | ||
if (prevSeqNo?.equals(maxSeqNo) == true && maxSeqNo != 0L) { | ||
return SearchHits.empty() | ||
} | ||
val boolQueryBuilder = BoolQueryBuilder() | ||
boolQueryBuilder.filter(QueryBuilders.rangeQuery("_seq_no").gt(prevSeqNo).lte(maxSeqNo)) | ||
boolQueryBuilder.must(QueryBuilders.queryStringQuery(query)) | ||
|
||
if (query != null) { | ||
boolQueryBuilder.must(QueryBuilders.queryStringQuery(query)) | ||
} | ||
|
||
val request: SearchRequest = SearchRequest() | ||
.indices(index) | ||
|
@@ -410,7 +456,45 @@ object DocumentReturningMonitorRunner : MonitorRunner { | |
return response.hits | ||
} | ||
|
||
private fun getAllDocIds(hits: SearchHits): List<String> { | ||
return hits.map { hit -> hit.id } | ||
private fun getMatchedQueries( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this not to get Matching Documents instead? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this method matches incoming docs with queries. named another method to |
||
monitorCtx: MonitorRunnerExecutionContext, | ||
docs: List<BytesReference>, | ||
monitor: Monitor | ||
): SearchHits { | ||
val boolQueryBuilder = BoolQueryBuilder() | ||
|
||
val percolateQueryBuilder = PercolateQueryBuilderExt("query", docs, XContentType.JSON) | ||
if (monitor.id.isNotEmpty()) { | ||
boolQueryBuilder.filter(QueryBuilders.matchQuery("monitor_id", monitor.id)) | ||
} | ||
boolQueryBuilder.filter(percolateQueryBuilder) | ||
|
||
val searchRequest = SearchRequest(ScheduledJob.DOC_LEVEL_QUERIES_INDEX) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need a safe check if the query index is already created/exists? It could be in Runner itself. Just ensuring the exception is floated up the Runner with the right messaging There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. removed the safe check. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens if DOC_LEVEL_QUERIES_INDEX is not created or is deleted for some reason? I dont think we have solved this use-case yet. Not a blocker, but something for a quick follow up. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. the current logic does not take care of high availability. This can be improved in future. |
||
val searchSourceBuilder = SearchSourceBuilder() | ||
searchSourceBuilder.query(boolQueryBuilder) | ||
searchRequest.source(searchSourceBuilder) | ||
|
||
val response: SearchResponse = monitorCtx.client!!.execute(SearchAction.INSTANCE, searchRequest).actionGet() | ||
|
||
if (response.status() !== RestStatus.OK) { | ||
throw IOException("Failed to search percolate index: ${ScheduledJob.DOC_LEVEL_QUERIES_INDEX}") | ||
} | ||
return response.hits | ||
} | ||
|
||
private fun getAllDocs(hits: SearchHits, monitorId: String): List<Pair<String, BytesReference>> { | ||
return hits.map { hit -> | ||
val sourceMap = hit.sourceAsMap | ||
|
||
var xContentBuilder = XContentFactory.jsonBuilder().startObject() | ||
sourceMap.forEach { (k, v) -> | ||
xContentBuilder = xContentBuilder.field("${k}_$monitorId", v) | ||
} | ||
xContentBuilder = xContentBuilder.endObject() | ||
|
||
val sourceRef = BytesReference.bytes(xContentBuilder) | ||
|
||
Pair(hit.id, sourceRef) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Were we able to create the full distribution and verify the same in single and multinode setup with no issues?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. the integration test build passes for single node & multi node with 3 nodes.