Skip to content

Commit

Permalink
Integrate Document Level Alerting changes (#410)
Browse files Browse the repository at this point in the history
* Rebase to push doc level changes on latest main changes (#391)

* Document level alerting dev (#272)

Signed-off-by: Ashish Agrawal <[email protected]>

* Add last run context to Monitor data model

Signed-off-by: Ashish Agrawal <[email protected]>

* add Update Monitor function

Signed-off-by: Ashish Agrawal <[email protected]>

* fix integ test

Signed-off-by: Ashish Agrawal <[email protected]>

* Implemented draft of Finding data model, a new Input type, and some basic unit tests. (#260)

* Implemented draft of Finding data model, and some basic unit tests for it.

Signed-off-by: AWSHurneyt <[email protected]>

* POC for doc-level-alerting (#277)

Signed-off-by: Sriram <[email protected]>

* Add connection to triggers for doc level alerting (#316)

Signed-off-by: Ashish Agrawal <[email protected]>

* CRUD APIs integration Tests and validation"conflict resolved" (#362)

Signed-off-by: charliezhangaws <[email protected]>

* Segregate monitor runner logic for separation of concerns (#363)

* Refactor monitor runner logic for separation of concerns and better testability.

Signed-off-by: Saurabh Singh <[email protected]>

* Add action and alert flow and findings schema and additional fixes (#381)

Signed-off-by: Ashish Agrawal <[email protected]>

* Finding Search API (#385)

* Findings search API based on Annie's work

Signed-off-by: Annie Lee <[email protected]>

* Fix Search API and add IT tests

Signed-off-by: Ashish Agrawal <[email protected]>

Co-authored-by: Annie Lee <[email protected]>

* Fix integ tests and minor issues from doc level changes

Signed-off-by: Ashish Agrawal <[email protected]>

Co-authored-by: Annie Lee <[email protected]>
Co-authored-by: Daniel Doubrovkine (dB.) <[email protected]>
Co-authored-by: AWSHurneyt <[email protected]>
Co-authored-by: Sriram <[email protected]>
Co-authored-by: charliezhangaws <[email protected]>
Co-authored-by: Saurabh Singh <[email protected]>
Co-authored-by: Annie Lee <[email protected]>

* Add Trigger condition resolver which parses and evaluates the Trigger expression. (#405)

Signed-off-by: Saurabh Singh <[email protected]>

* percolate query implementation in doc-level alerting (#399)

Signed-off-by: Subhobrata Dey <[email protected]>

* Finding Index rollover (#408)

* Finding Index rollover

Signed-off-by: jiahe zhang <[email protected]>

* Apply fixes to make rollover work

Signed-off-by: Ashish Agrawal <[email protected]>

Co-authored-by: jiahe zhang <[email protected]>

Co-authored-by: Annie Lee <[email protected]>
Co-authored-by: Daniel Doubrovkine (dB.) <[email protected]>
Co-authored-by: AWSHurneyt <[email protected]>
Co-authored-by: Sriram <[email protected]>
Co-authored-by: charliezhangaws <[email protected]>
Co-authored-by: Saurabh Singh <[email protected]>
Co-authored-by: Annie Lee <[email protected]>
Co-authored-by: Saurabh Singh <[email protected]>
Co-authored-by: Subhobrata Dey <[email protected]>
  • Loading branch information
10 people authored Apr 18, 2022
1 parent c77eb31 commit 84b80de
Show file tree
Hide file tree
Showing 87 changed files with 6,385 additions and 836 deletions.
1 change: 1 addition & 0 deletions alerting/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ configurations.testImplementation {

dependencies {
compileOnly "org.opensearch.plugin:opensearch-scripting-painless-spi:${versions.opensearch}"
api "org.opensearch.plugin:percolator-client:${opensearch_version}"

// OpenSearch Nanny state
implementation "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.percolator;

import org.opensearch.common.settings.Setting;
import org.opensearch.index.mapper.Mapper;
import org.opensearch.plugins.ExtensiblePlugin;
import org.opensearch.plugins.MapperPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SearchPlugin;
import org.opensearch.search.fetch.FetchSubPhase;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;

/**
* This is a stop-gap solution & will be removed in future.
*/
// TODO for cleanup
public class PercolatorPluginExt extends Plugin implements MapperPlugin, SearchPlugin, ExtensiblePlugin {
@Override
public List<QuerySpec<?>> getQueries() {
return singletonList(new QuerySpec<>(PercolateQueryBuilderExt.NAME, PercolateQueryBuilderExt::new, PercolateQueryBuilderExt::fromXContent));
}

@Override
public List<FetchSubPhase> getFetchSubPhases(FetchPhaseConstructionContext context) {
return Arrays.asList(new PercolatorMatchedSlotSubFetchPhase(), new PercolatorHighlightSubFetchPhase(context.getHighlighters()));
}

@Override
public List<Setting<?>> getSettings() {
return Arrays.asList(PercolatorFieldMapperExt.INDEX_MAP_UNMAPPED_FIELDS_AS_TEXT_SETTING);
}

@Override
public Map<String, Mapper.TypeParser> getMappers() {
return singletonMap(PercolatorFieldMapperExt.CONTENT_TYPE, new PercolatorFieldMapperExt.TypeParser());
}

@Override
public void loadExtensions(ExtensionLoader loader) {
ExtensiblePlugin.super.loadExtensions(loader);
}
}
29 changes: 27 additions & 2 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ import org.opensearch.alerting.model.ActionRunResult
import org.opensearch.alerting.model.AggregationResultBucket
import org.opensearch.alerting.model.Alert
import org.opensearch.alerting.model.BucketLevelTrigger
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.alerting.model.Trigger
import org.opensearch.alerting.model.action.AlertCategory
import org.opensearch.alerting.opensearchapi.firstFailureOrNull
import org.opensearch.alerting.opensearchapi.retry
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.getBucketKeysHash
Expand Down Expand Up @@ -166,6 +168,29 @@ class AlertService(
}
}

// TODO: clean this up so it follows the proper alert management for doc monitors
fun composeDocLevelAlert(
findings: List<String>,
relatedDocIds: List<String>,
ctx: DocumentLevelTriggerExecutionContext,
result: DocumentLevelTriggerRunResult,
alertError: AlertError?
): Alert {
val currentTime = Instant.now()

val actionExecutionResults = result.actionResults.map {
ActionExecutionResult(it.key, it.value.executionTime, if (it.value.throttled) 1 else 0)
}

val alertState = if (alertError == null) Alert.State.ACTIVE else Alert.State.ERROR
return Alert(
monitor = ctx.monitor, trigger = ctx.trigger, startTime = currentTime,
lastNotificationTime = currentTime, state = alertState, errorMessage = alertError?.message,
actionExecutionResults = actionExecutionResults, schemaVersion = IndexUtils.alertIndexSchemaVersion,
findingIds = findings, relatedDocIds = relatedDocIds
)
}

fun updateActionResultsForBucketLevelAlert(
currentAlert: Alert,
actionResults: Map<String, ActionRunResult>,
Expand Down Expand Up @@ -295,8 +320,8 @@ class AlertService(
DeleteRequest(AlertIndices.ALERT_INDEX, alert.id)
.routing(alert.monitorId),
// Only add completed alert to history index if history is enabled
if (alertIndices.isHistoryEnabled()) {
IndexRequest(AlertIndices.HISTORY_WRITE_INDEX)
if (alertIndices.isAlertHistoryEnabled()) {
IndexRequest(AlertIndices.ALERT_HISTORY_WRITE_INDEX)
.routing(alert.monitorId)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
.id(alert.id)
Expand Down
37 changes: 28 additions & 9 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import org.opensearch.alerting.action.GetAlertsAction
import org.opensearch.alerting.action.GetDestinationsAction
import org.opensearch.alerting.action.GetEmailAccountAction
import org.opensearch.alerting.action.GetEmailGroupAction
import org.opensearch.alerting.action.GetFindingsAction
import org.opensearch.alerting.action.GetMonitorAction
import org.opensearch.alerting.action.IndexDestinationAction
import org.opensearch.alerting.action.IndexEmailAccountAction
Expand All @@ -32,13 +33,15 @@ import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.alerting.core.action.node.ScheduledJobsStatsAction
import org.opensearch.alerting.core.action.node.ScheduledJobsStatsTransportAction
import org.opensearch.alerting.core.model.ClusterMetricsInput
import org.opensearch.alerting.core.model.DocLevelMonitorInput
import org.opensearch.alerting.core.model.ScheduledJob
import org.opensearch.alerting.core.model.SearchInput
import org.opensearch.alerting.core.resthandler.RestScheduledJobStatsHandler
import org.opensearch.alerting.core.schedule.JobScheduler
import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSettings
import org.opensearch.alerting.core.settings.ScheduledJobSettings
import org.opensearch.alerting.model.BucketLevelTrigger
import org.opensearch.alerting.model.DocumentLevelTrigger
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.model.QueryLevelTrigger
import org.opensearch.alerting.resthandler.RestAcknowledgeAlertAction
Expand All @@ -51,6 +54,7 @@ import org.opensearch.alerting.resthandler.RestGetAlertsAction
import org.opensearch.alerting.resthandler.RestGetDestinationsAction
import org.opensearch.alerting.resthandler.RestGetEmailAccountAction
import org.opensearch.alerting.resthandler.RestGetEmailGroupAction
import org.opensearch.alerting.resthandler.RestGetFindingsAction
import org.opensearch.alerting.resthandler.RestGetMonitorAction
import org.opensearch.alerting.resthandler.RestIndexDestinationAction
import org.opensearch.alerting.resthandler.RestIndexEmailAccountAction
Expand All @@ -74,6 +78,7 @@ import org.opensearch.alerting.transport.TransportGetAlertsAction
import org.opensearch.alerting.transport.TransportGetDestinationsAction
import org.opensearch.alerting.transport.TransportGetEmailAccountAction
import org.opensearch.alerting.transport.TransportGetEmailGroupAction
import org.opensearch.alerting.transport.TransportGetFindingsSearchAction
import org.opensearch.alerting.transport.TransportGetMonitorAction
import org.opensearch.alerting.transport.TransportIndexDestinationAction
import org.opensearch.alerting.transport.TransportIndexEmailAccountAction
Expand All @@ -82,6 +87,7 @@ import org.opensearch.alerting.transport.TransportIndexMonitorAction
import org.opensearch.alerting.transport.TransportSearchEmailAccountAction
import org.opensearch.alerting.transport.TransportSearchEmailGroupAction
import org.opensearch.alerting.transport.TransportSearchMonitorAction
import org.opensearch.alerting.util.DocLevelMonitorQueries
import org.opensearch.alerting.util.destinationmigration.DestinationMigrationCoordinator
import org.opensearch.client.Client
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
Expand All @@ -102,8 +108,8 @@ import org.opensearch.index.IndexModule
import org.opensearch.painless.spi.PainlessExtension
import org.opensearch.painless.spi.Whitelist
import org.opensearch.painless.spi.WhitelistLoader
import org.opensearch.percolator.PercolatorPluginExt
import org.opensearch.plugins.ActionPlugin
import org.opensearch.plugins.Plugin
import org.opensearch.plugins.ReloadablePlugin
import org.opensearch.plugins.ScriptPlugin
import org.opensearch.plugins.SearchPlugin
Expand All @@ -122,7 +128,7 @@ import java.util.function.Supplier
* It also adds [Monitor.XCONTENT_REGISTRY], [SearchInput.XCONTENT_REGISTRY], [QueryLevelTrigger.XCONTENT_REGISTRY],
* [BucketLevelTrigger.XCONTENT_REGISTRY], [ClusterMetricsInput.XCONTENT_REGISTRY] to the [NamedXContentRegistry] so that we are able to deserialize the custom named objects.
*/
internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, ReloadablePlugin, SearchPlugin, Plugin() {
internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, ReloadablePlugin, SearchPlugin, PercolatorPluginExt() {

override fun getContextWhitelists(): Map<ScriptContext<*>, List<Whitelist>> {
val whitelist = WhitelistLoader.loadFromResourceFiles(javaClass, "org.opensearch.alerting.txt")
Expand All @@ -140,13 +146,15 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
@JvmField val EMAIL_GROUP_BASE_URI = "$DESTINATION_BASE_URI/email_groups"
@JvmField val LEGACY_OPENDISTRO_EMAIL_ACCOUNT_BASE_URI = "$LEGACY_OPENDISTRO_DESTINATION_BASE_URI/email_accounts"
@JvmField val LEGACY_OPENDISTRO_EMAIL_GROUP_BASE_URI = "$LEGACY_OPENDISTRO_DESTINATION_BASE_URI/email_groups"
@JvmField val FINDING_BASE_URI = "/_plugins/_alerting/findings"
@JvmField val ALERTING_JOB_TYPES = listOf("monitor")
}

lateinit var runner: MonitorRunner
lateinit var runner: MonitorRunnerService
lateinit var scheduler: JobScheduler
lateinit var sweeper: JobSweeper
lateinit var scheduledJobIndices: ScheduledJobIndices
lateinit var docLevelMonitorQueries: DocLevelMonitorQueries
lateinit var threadPool: ThreadPool
lateinit var alertIndices: AlertIndices
lateinit var clusterService: ClusterService
Expand Down Expand Up @@ -180,7 +188,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
RestSearchEmailGroupAction(),
RestGetEmailGroupAction(),
RestGetDestinationsAction(),
RestGetAlertsAction()
RestGetAlertsAction(),
RestGetFindingsAction()
)
}

Expand All @@ -204,17 +213,21 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
ActionPlugin.ActionHandler(SearchEmailGroupAction.INSTANCE, TransportSearchEmailGroupAction::class.java),
ActionPlugin.ActionHandler(DeleteEmailGroupAction.INSTANCE, TransportDeleteEmailGroupAction::class.java),
ActionPlugin.ActionHandler(GetDestinationsAction.INSTANCE, TransportGetDestinationsAction::class.java),
ActionPlugin.ActionHandler(GetAlertsAction.INSTANCE, TransportGetAlertsAction::class.java)
ActionPlugin.ActionHandler(GetAlertsAction.INSTANCE, TransportGetAlertsAction::class.java),
ActionPlugin.ActionHandler(GetFindingsAction.INSTANCE, TransportGetFindingsSearchAction::class.java)

)
}

override fun getNamedXContent(): List<NamedXContentRegistry.Entry> {
return listOf(
Monitor.XCONTENT_REGISTRY,
SearchInput.XCONTENT_REGISTRY,
DocLevelMonitorInput.XCONTENT_REGISTRY,
QueryLevelTrigger.XCONTENT_REGISTRY,
BucketLevelTrigger.XCONTENT_REGISTRY,
ClusterMetricsInput.XCONTENT_REGISTRY
ClusterMetricsInput.XCONTENT_REGISTRY,
DocumentLevelTrigger.XCONTENT_REGISTRY
)
}

Expand All @@ -234,7 +247,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
// Need to figure out how to use the OpenSearch DI classes rather than handwiring things here.
val settings = environment.settings()
alertIndices = AlertIndices(settings, client, threadPool, clusterService)
runner = MonitorRunner
runner = MonitorRunnerService
.registerClusterService(clusterService)
.registerClient(client)
.registerNamedXContentRegistry(xContentRegistry)
Expand All @@ -248,12 +261,13 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
.registerConsumers()
.registerDestinationSettings()
scheduledJobIndices = ScheduledJobIndices(client.admin(), clusterService)
docLevelMonitorQueries = DocLevelMonitorQueries(client.admin(), clusterService)
scheduler = JobScheduler(threadPool, runner)
sweeper = JobSweeper(environment.settings(), client, clusterService, threadPool, xContentRegistry, scheduler, ALERTING_JOB_TYPES)
destinationMigrationCoordinator = DestinationMigrationCoordinator(client, clusterService, threadPool)
this.threadPool = threadPool
this.clusterService = clusterService
return listOf(sweeper, scheduler, runner, scheduledJobIndices, destinationMigrationCoordinator)
return listOf(sweeper, scheduler, runner, scheduledJobIndices, docLevelMonitorQueries, destinationMigrationCoordinator)
}

override fun getSettings(): List<Setting<*>> {
Expand Down Expand Up @@ -310,7 +324,12 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
LegacyOpenDistroDestinationSettings.EMAIL_USERNAME,
LegacyOpenDistroDestinationSettings.EMAIL_PASSWORD,
LegacyOpenDistroDestinationSettings.ALLOW_LIST,
LegacyOpenDistroDestinationSettings.HOST_DENY_LIST
LegacyOpenDistroDestinationSettings.HOST_DENY_LIST,
AlertingSettings.FINDING_HISTORY_ENABLED,
AlertingSettings.FINDING_HISTORY_MAX_DOCS,
AlertingSettings.FINDING_HISTORY_INDEX_MAX_AGE,
AlertingSettings.FINDING_HISTORY_ROLLOVER_PERIOD,
AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD
)
}

Expand Down
Loading

0 comments on commit 84b80de

Please sign in to comment.