-
Notifications
You must be signed in to change notification settings - Fork 104
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
POC for doc-level-alerting #277
Merged
lezzago
merged 6 commits into
opensearch-project:document-level-alerting-dev
from
skkosuri-amzn:doc-level
Jan 25, 2022
Merged
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
e736921
wip
skkosuri-amzn 4a9c08d
Merge remote-tracking branch 'upstream/document-level-alerting-dev' i…
skkosuri-amzn 3e74e1c
Initial working copy
skkosuri-amzn 8083b10
Merge branch 'document-level-alerting-dev' into doc-level
skkosuri-amzn 8e7069d
wip: algo and findings creation
skkosuri-amzn 54f993e
wip: added index to finding and fixed doc_id field to finding
skkosuri-amzn File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,17 +14,23 @@ import kotlinx.coroutines.runBlocking | |
import kotlinx.coroutines.withContext | ||
import org.apache.logging.log4j.LogManager | ||
import org.opensearch.action.bulk.BackoffPolicy | ||
import org.opensearch.action.index.IndexRequest | ||
import org.opensearch.action.search.SearchRequest | ||
import org.opensearch.action.search.SearchResponse | ||
import org.opensearch.action.support.WriteRequest | ||
import org.opensearch.alerting.alerts.AlertIndices | ||
import org.opensearch.alerting.alerts.moveAlerts | ||
import org.opensearch.alerting.core.JobRunner | ||
import org.opensearch.alerting.core.model.ScheduledJob | ||
import org.opensearch.alerting.elasticapi.InjectorContextElement | ||
import org.opensearch.alerting.elasticapi.retry | ||
import org.opensearch.alerting.elasticapi.string | ||
import org.opensearch.alerting.model.ActionRunResult | ||
import org.opensearch.alerting.model.Alert | ||
import org.opensearch.alerting.model.AlertingConfigAccessor | ||
import org.opensearch.alerting.model.BucketLevelTrigger | ||
import org.opensearch.alerting.model.BucketLevelTriggerRunResult | ||
import org.opensearch.alerting.model.Finding | ||
import org.opensearch.alerting.model.InputRunResults | ||
import org.opensearch.alerting.model.Monitor | ||
import org.opensearch.alerting.model.MonitorRunResult | ||
|
@@ -39,6 +45,8 @@ import org.opensearch.alerting.model.action.AlertCategory | |
import org.opensearch.alerting.model.action.PerAlertActionScope | ||
import org.opensearch.alerting.model.action.PerExecutionActionScope | ||
import org.opensearch.alerting.model.destination.DestinationContextFactory | ||
import org.opensearch.alerting.model.docLevelInput.DocLevelMonitorInput | ||
import org.opensearch.alerting.model.docLevelInput.DocLevelQuery | ||
import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext | ||
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext | ||
import org.opensearch.alerting.script.TriggerExecutionContext | ||
|
@@ -60,17 +68,32 @@ import org.opensearch.alerting.util.getCombinedTriggerRunResult | |
import org.opensearch.alerting.util.isADMonitor | ||
import org.opensearch.alerting.util.isAllowed | ||
import org.opensearch.alerting.util.isBucketLevelMonitor | ||
import org.opensearch.alerting.util.isDocLevelMonitor | ||
import org.opensearch.alerting.util.updateMonitor | ||
import org.opensearch.client.Client | ||
import org.opensearch.cluster.routing.ShardRouting | ||
import org.opensearch.cluster.service.ClusterService | ||
import org.opensearch.common.Strings | ||
import org.opensearch.common.component.AbstractLifecycleComponent | ||
import org.opensearch.common.settings.Settings | ||
import org.opensearch.common.xcontent.NamedXContentRegistry | ||
import org.opensearch.common.xcontent.ToXContent | ||
import org.opensearch.common.xcontent.XContentBuilder | ||
import org.opensearch.common.xcontent.XContentType | ||
import org.opensearch.index.query.BoolQueryBuilder | ||
import org.opensearch.index.query.QueryBuilders | ||
import org.opensearch.rest.RestStatus | ||
import org.opensearch.script.Script | ||
import org.opensearch.script.ScriptService | ||
import org.opensearch.script.TemplateScript | ||
import org.opensearch.search.SearchHits | ||
import org.opensearch.search.builder.SearchSourceBuilder | ||
import org.opensearch.search.sort.SortOrder | ||
import org.opensearch.threadpool.ThreadPool | ||
import java.io.IOException | ||
import java.time.Instant | ||
import java.util.UUID | ||
import kotlin.collections.HashMap | ||
import kotlin.coroutines.CoroutineContext | ||
|
||
object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { | ||
|
@@ -247,6 +270,8 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { | |
launch { | ||
if (job.isBucketLevelMonitor()) { | ||
runBucketLevelMonitor(job, periodStart, periodEnd) | ||
} else if (job.isDocLevelMonitor()) { | ||
runDocLevelMonitor(job, periodStart, periodEnd) | ||
} else { | ||
runQueryLevelMonitor(job, periodStart, periodEnd) | ||
} | ||
|
@@ -707,4 +732,182 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() { | |
.newInstance(template.params + mapOf("ctx" to ctx.asTemplateArg())) | ||
.execute() | ||
} | ||
|
||
private suspend fun runDocLevelMonitor(monitor: Monitor, periodStart: Instant, periodEnd: Instant, dryrun: Boolean = false) { | ||
|
||
logger.info("Document-level-monitor is running ...") | ||
try { | ||
validate(monitor) | ||
} catch (e: Exception) { | ||
logger.info("Failed to start Document-level-monitor. Error: ${e.message}") | ||
return | ||
} | ||
|
||
val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput | ||
val index = docLevelMonitorInput.indices[0] | ||
val queries: List<DocLevelQuery> = docLevelMonitorInput.queries | ||
|
||
var lastRunContext = monitor.lastRunContext.toMutableMap() | ||
try { | ||
if (lastRunContext.isNullOrEmpty()) { | ||
lastRunContext = createRunContext(index).toMutableMap() | ||
} | ||
} catch (e: Exception) { | ||
logger.info("Failed to start Document-level-monitor $index. Error: ${e.message}") | ||
return | ||
} | ||
|
||
for (query in queries) { | ||
runForEachQuery(monitor, lastRunContext, index, query) | ||
} | ||
} | ||
|
||
private suspend fun runForEachQuery(monitor: Monitor, lastRunContext: MutableMap<String, Any>, index: String, query: DocLevelQuery) { | ||
val count: Int = lastRunContext["shards_count"] as Int | ||
for (i: Int in 0 until count) { | ||
val shard = i.toString() | ||
try { | ||
logger.info("Monitor execution for shard: $shard") | ||
|
||
val maxSeqNo: Long = getMaxSeqNo(index, shard) | ||
logger.info("MaxSeqNo of shard_$shard is $maxSeqNo") | ||
|
||
// todo: scope to optimize this: in prev seqno and current max seq no are same don't search. | ||
val hits: SearchHits = searchShard(index, shard, lastRunContext[shard].toString().toLongOrNull(), maxSeqNo, query.query) | ||
logger.info("Search hits for shard_$shard is: ${hits.hits.size}") | ||
|
||
if (hits.hits.isNotEmpty()) { | ||
createFindings(monitor, index, query, hits) | ||
} | ||
|
||
logger.info("Updating monitor: ${monitor.id}") | ||
lastRunContext[shard] = maxSeqNo.toString() | ||
val updatedMonitor = monitor.copy(lastRunContext = lastRunContext) | ||
// note: update has to called in serial for shards of a given index. | ||
updateMonitor(client, xContentRegistry, settings, updatedMonitor) | ||
} catch (e: Exception) { | ||
logger.info("Failed to run for shard $shard. Error: ${e.message}") | ||
logger.debug("Failed to run for shard $shard", e) | ||
} | ||
} | ||
} | ||
|
||
// todo: add more validations. | ||
private fun validate(monitor: Monitor) { | ||
if (monitor.inputs.size > 1) { | ||
throw IOException("Only one input is supported with document-level-monitor.") | ||
} | ||
|
||
if (monitor.inputs[0].name() != DocLevelMonitorInput.DOC_LEVEL_INPUT_FIELD) { | ||
throw IOException("Invalid input with document-level-monitor.") | ||
} | ||
|
||
val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput | ||
if (docLevelMonitorInput.indices.size > 1) { | ||
throw IOException("Only one index is supported with document-level-monitor.") | ||
} | ||
} | ||
|
||
private fun getShardsCount(index: String): Int { | ||
val allShards: List<ShardRouting> = clusterService.state().routingTable().allShards(index) | ||
return allShards.size | ||
} | ||
|
||
private fun createRunContext(index: String): HashMap<String, Any> { | ||
val lastRunContext = HashMap<String, Any>() | ||
lastRunContext["index"] = index | ||
val count = getShardsCount(index) | ||
lastRunContext["shards_count"] = count | ||
|
||
for (i: Int in 0 until count) { | ||
val shard = i.toString() | ||
val maxSeqNo: Long = getMaxSeqNo(index, shard) | ||
lastRunContext[shard] = maxSeqNo | ||
} | ||
return lastRunContext | ||
} | ||
|
||
/** | ||
* Get the current max seq number of the shard. We find it by searching the last document | ||
* in the primary shard. | ||
*/ | ||
private fun getMaxSeqNo(index: String, shard: String): Long { | ||
val request: SearchRequest = SearchRequest() | ||
.indices(index) | ||
.preference("_shards:$shard") | ||
.source( | ||
SearchSourceBuilder() | ||
.version(true) | ||
.sort("_seq_no", SortOrder.DESC) | ||
.seqNoAndPrimaryTerm(true) | ||
.query(QueryBuilders.matchAllQuery()) | ||
.size(1) | ||
) | ||
val response: SearchResponse = client.search(request).actionGet() | ||
if (response.status() !== RestStatus.OK) { | ||
throw IOException("Failed to get max seq no for shard: $shard") | ||
} | ||
if (response.hits.hits.isEmpty()) | ||
return -1L | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be 0 instead or line 809 can throw errors since you cant have a seq# less or equal to -1. |
||
|
||
return response.hits.hits[0].seqNo | ||
} | ||
|
||
private fun searchShard(index: String, shard: String, prevSeqNo: Long?, maxSeqNo: Long, query: String): SearchHits { | ||
val boolQueryBuilder = BoolQueryBuilder() | ||
boolQueryBuilder.filter(QueryBuilders.rangeQuery("_seq_no").gt(prevSeqNo).lte(maxSeqNo)) | ||
boolQueryBuilder.must(QueryBuilders.queryStringQuery(query)) | ||
|
||
val request: SearchRequest = SearchRequest() | ||
.indices(index) | ||
.preference("_shards:$shard") | ||
.source( | ||
SearchSourceBuilder() | ||
.version(true) | ||
.query(boolQueryBuilder) | ||
.size(10000) // fixme: make this configurable. | ||
) | ||
logger.info("Request: $request") | ||
val response: SearchResponse = client.search(request).actionGet() | ||
if (response.status() !== RestStatus.OK) { | ||
throw IOException("Failed to search shard: $shard") | ||
} | ||
return response.hits | ||
} | ||
|
||
private fun createFindings(monitor: Monitor, index: String, docLevelQuery: DocLevelQuery, hits: SearchHits) { | ||
val finding = Finding( | ||
id = UUID.randomUUID().toString(), | ||
relatedDocId = getAllDocIds(hits), | ||
monitorId = monitor.id, | ||
monitorName = monitor.name, | ||
index = index, | ||
queryId = docLevelQuery.id, | ||
queryTags = docLevelQuery.tags, | ||
severity = docLevelQuery.severity, | ||
timestamp = Instant.now(), | ||
triggerId = null, // todo: add once integrated with actions/triggers | ||
triggerName = null // todo: add once integrated with actions/triggers | ||
) | ||
|
||
val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string() | ||
// change this to debug. | ||
logger.info("Findings: $findingStr") | ||
|
||
// todo: below is all hardcoded, temp code and added only to test. replace this with proper Findings index lifecycle management. | ||
val indexRequest = IndexRequest(".opensearch-alerting-findings") | ||
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) | ||
.source(findingStr, XContentType.JSON) | ||
|
||
client.index(indexRequest).actionGet() | ||
} | ||
|
||
private fun getAllDocIds(hits: SearchHits): String { | ||
var sb = StringBuilder() | ||
for (hit in hits) { | ||
sb.append(hit.id) | ||
sb.append(",") | ||
} | ||
return sb.substring(0, sb.length - 1) | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this the case? I thought a monitor's execution is not done in parallel, so couldn't we do the update call after executing on all shards?