Skip to content

Commit

Permalink
Fixed xContent dependencies due to OSCore changes (opensearch-project…
Browse files Browse the repository at this point in the history
…#839)

Signed-off-by: Angie Zhang <[email protected]>
  • Loading branch information
Angie Zhang authored Mar 29, 2023
1 parent 1bc8c82 commit eaafedf
Show file tree
Hide file tree
Showing 91 changed files with 774 additions and 445 deletions.
2 changes: 1 addition & 1 deletion alerting/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ dependencies {
implementation "com.github.seancfoley:ipaddress:5.3.3"

testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}"
testImplementation "org.mockito:mockito-core:5.1.0"
testImplementation "org.mockito:mockito-core:${versions.mockito}"
testImplementation "org.opensearch.plugin:reindex-client:${opensearch_version}"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,20 @@
import org.opensearch.Version;
import org.opensearch.action.ActionListener;
import org.opensearch.action.get.GetRequest;
import org.opensearch.common.ParseField;
import org.opensearch.core.ParseField;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.io.stream.InputStreamStreamInput;
import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.xcontent.ConstructingObjectParser;
import org.opensearch.core.xcontent.ConstructingObjectParser;
import org.opensearch.common.xcontent.LoggingDeprecationHandler;
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentHelper;
import org.opensearch.common.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.index.analysis.FieldNameAnalyzer;
import org.opensearch.index.fielddata.IndexFieldData;
Expand Down Expand Up @@ -103,8 +103,8 @@
import java.util.Objects;
import java.util.function.Supplier;

import static org.opensearch.common.xcontent.ConstructingObjectParser.constructorArg;
import static org.opensearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
import static org.opensearch.core.xcontent.ConstructingObjectParser.constructorArg;
import static org.opensearch.core.xcontent.ConstructingObjectParser.optionalConstructorArg;
import static org.opensearch.search.SearchService.ALLOW_EXPENSIVE_QUERIES;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@
import org.opensearch.common.lucene.search.Queries;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.XContentLocation;
import org.opensearch.common.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentLocation;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.mapper.BinaryFieldMapper;
import org.opensearch.index.mapper.FieldMapper;
import org.opensearch.index.mapper.KeywordFieldMapper;
Expand Down
28 changes: 19 additions & 9 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@ import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.client.Client
import org.opensearch.common.bytes.BytesReference
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.common.xcontent.XContentParserUtils
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.alerts.AlertError
Expand All @@ -44,6 +42,8 @@ import org.opensearch.commons.alerting.model.DataSources
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.Trigger
import org.opensearch.commons.alerting.model.action.AlertCategory
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.index.query.QueryBuilders
import org.opensearch.rest.RestStatus
import org.opensearch.search.builder.SearchSourceBuilder
Expand Down Expand Up @@ -146,17 +146,23 @@ class AlertService(
val updatedHistory = currentAlert?.errorHistory.update(alertError)
return if (alertError == null && !result.triggered) {
currentAlert?.copy(
state = Alert.State.COMPLETED, endTime = currentTime, errorMessage = null,
errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults,
state = Alert.State.COMPLETED,
endTime = currentTime,
errorMessage = null,
errorHistory = updatedHistory,
actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion
)
} else if (alertError == null && currentAlert?.isAcknowledged() == true) {
null
} else if (currentAlert != null) {
val alertState = if (alertError == null) Alert.State.ACTIVE else Alert.State.ERROR
currentAlert.copy(
state = alertState, lastNotificationTime = currentTime, errorMessage = alertError?.message,
errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults,
state = alertState,
lastNotificationTime = currentTime,
errorMessage = alertError?.message,
errorHistory = updatedHistory,
actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion
)
} else {
Expand Down Expand Up @@ -275,7 +281,9 @@ class AlertService(
val currentTime = Instant.now()
return currentAlerts?.map {
it.value.copy(
state = Alert.State.COMPLETED, endTime = currentTime, errorMessage = null,
state = Alert.State.COMPLETED,
endTime = currentTime,
errorMessage = null,
schemaVersion = IndexUtils.alertIndexSchemaVersion
)
} ?: listOf()
Expand Down Expand Up @@ -417,8 +425,10 @@ class AlertService(

private fun contentParser(bytesReference: BytesReference): XContentParser {
val xcp = XContentHelper.createParser(
xContentRegistry, LoggingDeprecationHandler.INSTANCE,
bytesReference, XContentType.JSON
xContentRegistry,
LoggingDeprecationHandler.INSTANCE,
bytesReference,
XContentType.JSON
)
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp)
return xcp
Expand Down
15 changes: 13 additions & 2 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ import org.opensearch.common.settings.IndexScopedSettings
import org.opensearch.common.settings.Setting
import org.opensearch.common.settings.Settings
import org.opensearch.common.settings.SettingsFilter
import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.common.xcontent.XContentParser
import org.opensearch.commons.alerting.action.AlertingActions
import org.opensearch.commons.alerting.aggregation.bucketselectorext.BucketSelectorExtAggregationBuilder
import org.opensearch.commons.alerting.model.BucketLevelTrigger
Expand All @@ -80,6 +78,8 @@ import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.QueryLevelTrigger
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.SearchInput
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.env.Environment
import org.opensearch.env.NodeEnvironment
import org.opensearch.index.IndexModule
Expand Down Expand Up @@ -115,16 +115,27 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R

companion object {
@JvmField val OPEN_SEARCH_DASHBOARDS_USER_AGENT = "OpenSearch-Dashboards"

@JvmField val UI_METADATA_EXCLUDE = arrayOf("monitor.${Monitor.UI_METADATA_FIELD}")

@JvmField val MONITOR_BASE_URI = "/_plugins/_alerting/monitors"

@JvmField val DESTINATION_BASE_URI = "/_plugins/_alerting/destinations"

@JvmField val LEGACY_OPENDISTRO_MONITOR_BASE_URI = "/_opendistro/_alerting/monitors"

@JvmField val LEGACY_OPENDISTRO_DESTINATION_BASE_URI = "/_opendistro/_alerting/destinations"

@JvmField val EMAIL_ACCOUNT_BASE_URI = "$DESTINATION_BASE_URI/email_accounts"

@JvmField val EMAIL_GROUP_BASE_URI = "$DESTINATION_BASE_URI/email_groups"

@JvmField val LEGACY_OPENDISTRO_EMAIL_ACCOUNT_BASE_URI = "$LEGACY_OPENDISTRO_DESTINATION_BASE_URI/email_accounts"

@JvmField val LEGACY_OPENDISTRO_EMAIL_GROUP_BASE_URI = "$LEGACY_OPENDISTRO_DESTINATION_BASE_URI/email_groups"

@JvmField val FINDING_BASE_URI = "/_plugins/_alerting/findings"

@JvmField val ALERTING_JOB_TYPES = listOf("monitor")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.alerting.util.getCombinedTriggerRunResult
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.BucketLevelTrigger
Expand All @@ -38,6 +36,8 @@ import org.opensearch.commons.alerting.model.action.AlertCategory
import org.opensearch.commons.alerting.model.action.PerAlertActionScope
import org.opensearch.commons.alerting.model.action.PerExecutionActionScope
import org.opensearch.commons.alerting.util.string
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.index.query.BoolQueryBuilder
import org.opensearch.index.query.QueryBuilders
import org.opensearch.rest.RestStatus
Expand Down Expand Up @@ -162,7 +162,11 @@ object BucketLevelMonitorRunner : MonitorRunner() {
// TODO: Should triggerResult's aggregationResultBucket be a list? If not, getCategorizedAlertsForBucketLevelMonitor can
// be refactored to use a map instead
val categorizedAlerts = monitorCtx.alertService!!.getCategorizedAlertsForBucketLevelMonitor(
monitor, trigger, currentAlertsForTrigger, triggerResult.aggregationResultBuckets.values.toList(), findings
monitor,
trigger,
currentAlertsForTrigger,
triggerResult.aggregationResultBuckets.values.toList(),
findings
).toMutableMap()
val dedupedAlerts = categorizedAlerts.getOrDefault(AlertCategory.DEDUPED, emptyList())
var newAlerts = categorizedAlerts.getOrDefault(AlertCategory.NEW, emptyList())
Expand All @@ -178,7 +182,10 @@ object BucketLevelMonitorRunner : MonitorRunner() {
*/
if (!dryrun && monitor.id != Monitor.NO_ID) {
monitorCtx.alertService!!.saveAlerts(
monitor.dataSources, dedupedAlerts, monitorCtx.retryPolicy!!, allowUpdatingAcknowledgedAlert = true
monitor.dataSources,
dedupedAlerts,
monitorCtx.retryPolicy!!,
allowUpdatingAcknowledgedAlert = true
)
newAlerts = monitorCtx.alertService!!.saveNewAlerts(monitor.dataSources, newAlerts, monitorCtx.retryPolicy!!)
}
Expand All @@ -200,9 +207,10 @@ object BucketLevelMonitorRunner : MonitorRunner() {
// However, this operation will only be done if there was no trigger error, since otherwise the nextAlerts were not collected
// in favor of just using the currentAlerts as-is.
currentAlerts.forEach { (trigger, keysToAlertsMap) ->
if (triggerResults[trigger.id]?.error == null)
if (triggerResults[trigger.id]?.error == null) {
nextAlerts[trigger.id]?.get(AlertCategory.COMPLETED)
?.addAll(monitorCtx.alertService!!.convertToCompletedAlerts(keysToAlertsMap))
}
}

for (trigger in monitor.triggers) {
Expand Down Expand Up @@ -241,7 +249,10 @@ object BucketLevelMonitorRunner : MonitorRunner() {
val alertsToExecuteActionsFor = nextAlerts[trigger.id]?.get(alertCategory) ?: mutableListOf()
for (alert in alertsToExecuteActionsFor) {
val actionCtx = getActionContextForAlertCategory(
alertCategory, alert, triggerCtx, monitorOrTriggerError
alertCategory,
alert,
triggerCtx,
monitorOrTriggerError
)
// AggregationResultBucket should not be null here
val alertBucketKeysHash = alert.aggregationResultBucket!!.getBucketKeysHash()
Expand All @@ -267,8 +278,9 @@ object BucketLevelMonitorRunner : MonitorRunner() {
} else if (actionExecutionScope is PerExecutionActionScope || shouldDefaultToPerExecution) {
// If all categories of Alerts are empty, there is nothing to message on and we can skip the Action.
// If the error is not null, this is disregarded and the Action is executed anyway so the user can be notified.
if (monitorOrTriggerError == null && dedupedAlerts.isEmpty() && newAlerts.isEmpty() && completedAlerts.isEmpty())
if (monitorOrTriggerError == null && dedupedAlerts.isEmpty() && newAlerts.isEmpty() && completedAlerts.isEmpty()) {
continue
}

val actionCtx = triggerCtx.copy(
dedupedAlerts = dedupedAlerts,
Expand Down Expand Up @@ -314,7 +326,10 @@ object BucketLevelMonitorRunner : MonitorRunner() {
// ACKNOWLEDGED Alerts should not be saved here since actions are not executed for them.
if (!dryrun && monitor.id != Monitor.NO_ID) {
monitorCtx.alertService!!.saveAlerts(
monitor.dataSources, updatedAlerts, monitorCtx.retryPolicy!!, allowUpdatingAcknowledgedAlert = false
monitor.dataSources,
updatedAlerts,
monitorCtx.retryPolicy!!,
allowUpdatingAcknowledgedAlert = false
)
// Save any COMPLETED Alerts that were not covered in updatedAlerts
monitorCtx.alertService!!.saveAlerts(
Expand Down Expand Up @@ -375,8 +390,10 @@ object BucketLevelMonitorRunner : MonitorRunner() {
)
val searchSource = monitorCtx.scriptService!!.compile(
Script(
ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG,
query.toString(), searchParams
ScriptType.INLINE,
Script.DEFAULT_TEMPLATE_LANG,
query.toString(),
searchParams
),
TemplateScript.CONTEXT
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.routing.ShardRouting
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.bytes.BytesReference
import org.opensearch.common.xcontent.ToXContent
import org.opensearch.common.xcontent.XContentBuilder
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.ActionExecutionResult
Expand All @@ -42,6 +40,8 @@ import org.opensearch.commons.alerting.model.Finding
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.action.PerAlertActionScope
import org.opensearch.commons.alerting.util.string
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.index.query.BoolQueryBuilder
import org.opensearch.index.query.QueryBuilders
import org.opensearch.percolator.PercolateQueryBuilderExt
Expand Down Expand Up @@ -442,8 +442,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
if (response.status() !== RestStatus.OK) {
throw IOException("Failed to get max seq no for shard: $shard")
}
if (response.hits.hits.isEmpty())
if (response.hits.hits.isEmpty()) {
return -1L
}

return response.hits.hits[0].seqNo
}
Expand Down
14 changes: 9 additions & 5 deletions alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ import org.opensearch.common.io.stream.BytesStreamOutput
import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput
import org.opensearch.common.io.stream.NamedWriteableRegistry
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.ClusterMetricsInput
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.SearchInput
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.script.Script
import org.opensearch.script.ScriptService
import org.opensearch.script.ScriptType
Expand Down Expand Up @@ -68,8 +68,10 @@ class InputService(
val rewrittenQuery = AggregationQueryRewriter.rewriteQuery(deepCopyQuery(input.query), prevResult, monitor.triggers)
val searchSource = scriptService.compile(
Script(
ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG,
rewrittenQuery.toString(), searchParams
ScriptType.INLINE,
Script.DEFAULT_TEMPLATE_LANG,
rewrittenQuery.toString(),
searchParams
),
TemplateScript.CONTEXT
)
Expand Down Expand Up @@ -131,8 +133,10 @@ class InputService(
val searchParams = mapOf("period_start" to periodStart.toEpochMilli(), "period_end" to periodEnd.toEpochMilli())
val searchSource = scriptService.compile(
Script(
ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG,
input.query.toString(), searchParams
ScriptType.INLINE,
Script.DEFAULT_TEMPLATE_LANG,
input.query.toString(),
searchParams
),
TemplateScript.CONTEXT
)
Expand Down
Loading

0 comments on commit eaafedf

Please sign in to comment.