diff --git a/.github/workflows/bwc-test-workflow.yml b/.github/workflows/bwc-test-workflow.yml index ea28b76c8..66174c697 100644 --- a/.github/workflows/bwc-test-workflow.yml +++ b/.github/workflows/bwc-test-workflow.yml @@ -12,7 +12,7 @@ jobs: build: strategy: matrix: - java: [ 14 ] + java: [ 11 ] # Job name name: Build and test Alerting # This job runs on Linux @@ -27,11 +27,11 @@ jobs: - name: Checkout Branch uses: actions/checkout@v2 # This step uses the setup-java Github action: https://github.com/actions/setup-java - - name: Set Up JDK 14 + - name: Set Up JDK 11 uses: actions/setup-java@v1 with: - java-version: 14 + java-version: 11 - name: Run Alerting Backwards Compatibility Tests run: | echo "Running backwards compatibility tests..." - ./gradlew bwcTestSuite -Dbuild.qualifier=alpha1 + ./gradlew bwcTestSuite diff --git a/.github/workflows/multi-node-test-workflow.yml b/.github/workflows/multi-node-test-workflow.yml index 4722b58a0..b6377ed5d 100644 --- a/.github/workflows/multi-node-test-workflow.yml +++ b/.github/workflows/multi-node-test-workflow.yml @@ -12,7 +12,7 @@ jobs: build: strategy: matrix: - java: [ 11 ] + java: [ 11, 17 ] # Job name name: Build and test Alerting # This job runs on Linux @@ -27,4 +27,4 @@ jobs: - name: Checkout Branch uses: actions/checkout@v2 - name: Run integration tests with multi node config - run: ./gradlew integTest -PnumNodes=3 -Dbuild.qualifier=alpha1 + run: ./gradlew integTest -PnumNodes=3 diff --git a/.github/workflows/security-test-workflow.yml b/.github/workflows/security-test-workflow.yml index 8dfe68b5f..7cb66242c 100644 --- a/.github/workflows/security-test-workflow.yml +++ b/.github/workflows/security-test-workflow.yml @@ -12,7 +12,7 @@ jobs: build: strategy: matrix: - java: [ 11 ] + java: [ 11, 17 ] # Job name name: Build and test Alerting # This job runs on Linux @@ -27,38 +27,36 @@ jobs: - name: Checkout Branch uses: actions/checkout@v2 # This step uses the setup-java Github action: https://github.com/actions/setup-java - - name: Set Up JDK 11 + - name: Set Up JDK ${{ matrix.java }} uses: actions/setup-java@v1 with: - java-version: 11 + java-version: ${{ matrix.java }} - name: Build Alerting # Only assembling since the full build is governed by other workflows - run: ./gradlew assemble -Dbuild.qualifier=alpha1 + run: ./gradlew assemble - name: Pull and Run Docker run: | - plugin=`ls alerting/build/distributions/*.zip` + plugin=`basename $(ls alerting/build/distributions/*.zip)` list_of_files=`ls` list_of_all_files=`ls alerting/build/distributions/` version=`echo $plugin|awk -F- '{print $3}'| cut -d. -f 1-3` plugin_version=`echo $plugin|awk -F- '{print $3}'| cut -d. -f 1-4` qualifier=`echo $plugin|awk -F- '{print $4}'| cut -d. -f 1-1` candidate_version=`echo $plugin|awk -F- '{print $5}'| cut -d. -f 1-1` - docker_version=$version-$qualifier-candidate_version + docker_version=$version-$qualifier - if [ -z $candidate_version ]; then - candidate_version=$qualifier && qualifier="" - docker_version=$version - fi + [[ -z $candidate_version ]] && candidate_version=$qualifier && qualifier="" - echo $version $plugin_version $qualifier $candidate_version $docker_version + echo plugin version plugin_version qualifier candidate_version docker_version + echo "($plugin) ($version) ($plugin_version) ($qualifier) ($candidate_version) ($docker_version)" echo $ls $list_of_all_files if docker pull opensearchstaging/opensearch:$docker_version then echo "FROM opensearchstaging/opensearch:$docker_version" >> Dockerfile echo "RUN if [ -d /usr/share/opensearch/plugins/opensearch-alerting ]; then /usr/share/opensearch/bin/opensearch-plugin remove opensearch-alerting; fi" >> Dockerfile - echo "ADD alerting/build/distributions/opensearch-alerting-$plugin_version-$candidate_version.zip /tmp/" >> Dockerfile - echo "RUN /usr/share/opensearch/bin/opensearch-plugin install --batch file:/tmp/opensearch-alerting-$plugin_version-$candidate_version.zip" >> Dockerfile + echo "ADD alerting/build/distributions/$plugin /tmp/" >> Dockerfile + echo "RUN /usr/share/opensearch/bin/opensearch-plugin install --batch file:/tmp/$plugin" >> Dockerfile docker build -t opensearch-alerting:test . echo "imagePresent=true" >> $GITHUB_ENV diff --git a/.github/workflows/test-workflow.yml b/.github/workflows/test-workflow.yml index b764b39c7..c83a25fd5 100644 --- a/.github/workflows/test-workflow.yml +++ b/.github/workflows/test-workflow.yml @@ -12,7 +12,7 @@ jobs: build: strategy: matrix: - java: [11, 14] + java: [11, 17] # Job name name: Build Alerting with JDK ${{ matrix.java }} # This job runs on Linux @@ -27,7 +27,7 @@ jobs: with: java-version: ${{ matrix.java }} - name: Build and run with Gradle - run: ./gradlew build -Dbuild.qualifier=alpha1 + run: ./gradlew build - name: Create Artifact Path run: | mkdir -p alerting-artifacts diff --git a/alerting/build.gradle b/alerting/build.gradle index 47d0026f5..7a97478f2 100644 --- a/alerting/build.gradle +++ b/alerting/build.gradle @@ -111,6 +111,20 @@ testClusters.integTest { debugPort += 1 } } + + plugin(provider({ + new RegularFile() { + @Override + File getAsFile() { fileTree("src/test/resources/notifications-core").getSingleFile() } + } + })) + + plugin(provider({ + new RegularFile() { + @Override + File getAsFile() { fileTree("src/test/resources/notifications").getSingleFile() } + } + })) } testClusters.integTest.nodes.each { node -> @@ -187,7 +201,19 @@ task prepareBwcTests { dependsOn bundle doLast { plugins = [ - project.getObjects().fileProperty().value(bundle.getArchiveFile()) + project.getObjects().fileProperty().value(bundle.getArchiveFile()), + provider({ + new RegularFile() { + @Override + File getAsFile() { fileTree("src/test/resources/notifications-core").getSingleFile() } + } + }), + provider({ + new RegularFile() { + @Override + File getAsFile() { fileTree("src/test/resources/notifications").getSingleFile() } + } + }) ] } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt index 655bdea6b..c241fdaf1 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt @@ -17,9 +17,6 @@ import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.alerting.alerts.AlertError import org.opensearch.alerting.alerts.AlertIndices -import org.opensearch.alerting.elasticapi.firstFailureOrNull -import org.opensearch.alerting.elasticapi.retry -import org.opensearch.alerting.elasticapi.suspendUntil import org.opensearch.alerting.model.ActionExecutionResult import org.opensearch.alerting.model.ActionRunResult import org.opensearch.alerting.model.AggregationResultBucket @@ -29,6 +26,9 @@ import org.opensearch.alerting.model.Monitor import org.opensearch.alerting.model.QueryLevelTriggerRunResult import org.opensearch.alerting.model.Trigger import org.opensearch.alerting.model.action.AlertCategory +import org.opensearch.alerting.opensearchapi.firstFailureOrNull +import org.opensearch.alerting.opensearchapi.retry +import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.getBucketKeysHash diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index 7afe7c856..2e74e13bc 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -82,6 +82,7 @@ import org.opensearch.alerting.transport.TransportIndexMonitorAction import org.opensearch.alerting.transport.TransportSearchEmailAccountAction import org.opensearch.alerting.transport.TransportSearchEmailGroupAction import org.opensearch.alerting.transport.TransportSearchMonitorAction +import org.opensearch.alerting.util.destinationmigration.DestinationMigrationCoordinator import org.opensearch.client.Client import org.opensearch.cluster.metadata.IndexNameExpressionResolver import org.opensearch.cluster.node.DiscoveryNodes @@ -149,6 +150,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R lateinit var threadPool: ThreadPool lateinit var alertIndices: AlertIndices lateinit var clusterService: ClusterService + lateinit var destinationMigrationCoordinator: DestinationMigrationCoordinator override fun getRestHandlers( settings: Settings, @@ -248,9 +250,10 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R scheduledJobIndices = ScheduledJobIndices(client.admin(), clusterService) scheduler = JobScheduler(threadPool, runner) sweeper = JobSweeper(environment.settings(), client, clusterService, threadPool, xContentRegistry, scheduler, ALERTING_JOB_TYPES) + destinationMigrationCoordinator = DestinationMigrationCoordinator(client, clusterService, threadPool) this.threadPool = threadPool this.clusterService = clusterService - return listOf(sweeper, scheduler, runner, scheduledJobIndices) + return listOf(sweeper, scheduler, runner, scheduledJobIndices, destinationMigrationCoordinator) } override fun getSettings(): List> { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt index 4af66dc7c..a5916a73c 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt @@ -10,11 +10,11 @@ import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.alerting.core.model.ClusterMetricsInput import org.opensearch.alerting.core.model.SearchInput -import org.opensearch.alerting.elasticapi.convertToMap -import org.opensearch.alerting.elasticapi.suspendUntil import org.opensearch.alerting.model.InputRunResults import org.opensearch.alerting.model.Monitor import org.opensearch.alerting.model.TriggerAfterKey +import org.opensearch.alerting.opensearchapi.convertToMap +import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.util.AggregationQueryRewriter import org.opensearch.alerting.util.addUserBackendRolesFilter import org.opensearch.alerting.util.executeTransportAction diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt index a109cce47..8ffe31bfe 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt @@ -17,9 +17,6 @@ import org.opensearch.alerting.alerts.AlertIndices import org.opensearch.alerting.alerts.moveAlerts import org.opensearch.alerting.core.JobRunner import org.opensearch.alerting.core.model.ScheduledJob -import org.opensearch.alerting.elasticapi.InjectorContextElement -import org.opensearch.alerting.elasticapi.retry -import org.opensearch.alerting.elasticapi.withClosableContext import org.opensearch.alerting.model.ActionRunResult import org.opensearch.alerting.model.Alert import org.opensearch.alerting.model.AlertingConfigAccessor @@ -39,6 +36,9 @@ import org.opensearch.alerting.model.action.AlertCategory import org.opensearch.alerting.model.action.PerAlertActionScope import org.opensearch.alerting.model.action.PerExecutionActionScope import org.opensearch.alerting.model.destination.DestinationContextFactory +import org.opensearch.alerting.opensearchapi.InjectorContextElement +import org.opensearch.alerting.opensearchapi.retry +import org.opensearch.alerting.opensearchapi.withClosableContext import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext import org.opensearch.alerting.script.TriggerExecutionContext diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertError.kt b/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertError.kt index 99f5be86c..72d788684 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertError.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertError.kt @@ -5,8 +5,8 @@ package org.opensearch.alerting.alerts -import org.opensearch.alerting.elasticapi.instant -import org.opensearch.alerting.elasticapi.optionalTimeField +import org.opensearch.alerting.opensearchapi.instant +import org.opensearch.alerting.opensearchapi.optionalTimeField import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.io.stream.StreamOutput import org.opensearch.common.io.stream.Writeable diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertIndices.kt b/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertIndices.kt index 85c20e0cc..4b13a3085 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertIndices.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertIndices.kt @@ -23,7 +23,7 @@ import org.opensearch.action.support.IndicesOptions import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.alerting.alerts.AlertIndices.Companion.ALERT_INDEX import org.opensearch.alerting.alerts.AlertIndices.Companion.HISTORY_WRITE_INDEX -import org.opensearch.alerting.elasticapi.suspendUntil +import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_HISTORY_ENABLED import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_HISTORY_INDEX_MAX_AGE diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertMover.kt b/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertMover.kt index 880faa332..06b9bea48 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertMover.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertMover.kt @@ -13,9 +13,9 @@ import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.alerting.alerts.AlertIndices.Companion.ALERT_INDEX import org.opensearch.alerting.alerts.AlertIndices.Companion.HISTORY_WRITE_INDEX -import org.opensearch.alerting.elasticapi.suspendUntil import org.opensearch.alerting.model.Alert import org.opensearch.alerting.model.Monitor +import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.client.Client import org.opensearch.common.bytes.BytesReference import org.opensearch.common.xcontent.LoggingDeprecationHandler diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/ActionExecutionResult.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/ActionExecutionResult.kt index f44fcf337..ecdbd8ea4 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/ActionExecutionResult.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/ActionExecutionResult.kt @@ -5,8 +5,8 @@ package org.opensearch.alerting.model -import org.opensearch.alerting.elasticapi.instant -import org.opensearch.alerting.elasticapi.optionalTimeField +import org.opensearch.alerting.opensearchapi.instant +import org.opensearch.alerting.opensearchapi.optionalTimeField import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.io.stream.StreamOutput import org.opensearch.common.io.stream.Writeable diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/Alert.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/Alert.kt index 6fffd0032..e892bf560 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/Alert.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/Alert.kt @@ -6,9 +6,9 @@ package org.opensearch.alerting.model import org.opensearch.alerting.alerts.AlertError -import org.opensearch.alerting.elasticapi.instant -import org.opensearch.alerting.elasticapi.optionalTimeField -import org.opensearch.alerting.elasticapi.optionalUserField +import org.opensearch.alerting.opensearchapi.instant +import org.opensearch.alerting.opensearchapi.optionalTimeField +import org.opensearch.alerting.opensearchapi.optionalUserField import org.opensearch.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.io.stream.StreamOutput diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/AlertingConfigAccessor.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/AlertingConfigAccessor.kt index d96a4e566..7e982dd41 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/AlertingConfigAccessor.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/AlertingConfigAccessor.kt @@ -10,10 +10,10 @@ import kotlinx.coroutines.withContext import org.opensearch.action.get.GetRequest import org.opensearch.action.get.GetResponse import org.opensearch.alerting.core.model.ScheduledJob -import org.opensearch.alerting.elasticapi.suspendUntil import org.opensearch.alerting.model.destination.Destination import org.opensearch.alerting.model.destination.email.EmailAccount import org.opensearch.alerting.model.destination.email.EmailGroup +import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.client.Client import org.opensearch.common.bytes.BytesReference import org.opensearch.common.xcontent.LoggingDeprecationHandler diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/BucketLevelTriggerRunResult.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/BucketLevelTriggerRunResult.kt index 79d48e8b2..63131d835 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/BucketLevelTriggerRunResult.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/BucketLevelTriggerRunResult.kt @@ -38,7 +38,7 @@ data class BucketLevelTriggerRunResult( override fun writeTo(out: StreamOutput) { super.writeTo(out) out.writeMap(aggregationResultBuckets, StreamOutput::writeString) { - valueOut: StreamOutput, aggResultBucket: AggregationResultBucket -> + valueOut: StreamOutput, aggResultBucket: AggregationResultBucket -> aggResultBucket.writeTo(valueOut) } out.writeMap(actionResultsMap as Map) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/Monitor.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/Monitor.kt index 68bcf5966..8141ebb42 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/Monitor.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/Monitor.kt @@ -11,9 +11,9 @@ import org.opensearch.alerting.core.model.Input import org.opensearch.alerting.core.model.Schedule import org.opensearch.alerting.core.model.ScheduledJob import org.opensearch.alerting.core.model.SearchInput -import org.opensearch.alerting.elasticapi.instant -import org.opensearch.alerting.elasticapi.optionalTimeField -import org.opensearch.alerting.elasticapi.optionalUserField +import org.opensearch.alerting.opensearchapi.instant +import org.opensearch.alerting.opensearchapi.optionalTimeField +import org.opensearch.alerting.opensearchapi.optionalUserField import org.opensearch.alerting.settings.AlertingSettings.Companion.MONITOR_MAX_INPUTS import org.opensearch.alerting.settings.AlertingSettings.Companion.MONITOR_MAX_TRIGGERS import org.opensearch.alerting.settings.SupportedClusterMetricsSettings @@ -289,7 +289,7 @@ data class Monitor( requireNotNull(schedule) { "Monitor schedule is null" }, lastUpdateTime ?: Instant.now(), enabledTime, - MonitorType.valueOf(monitorType.toUpperCase(Locale.ROOT)), + MonitorType.valueOf(monitorType.uppercase(Locale.ROOT)), user, schemaVersion, inputs.toList(), diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/MonitorRunResult.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/MonitorRunResult.kt index 8203fb6cb..7e13f9281 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/MonitorRunResult.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/MonitorRunResult.kt @@ -8,7 +8,7 @@ package org.opensearch.alerting.model import org.apache.logging.log4j.LogManager import org.opensearch.OpenSearchException import org.opensearch.alerting.alerts.AlertError -import org.opensearch.alerting.elasticapi.optionalTimeField +import org.opensearch.alerting.opensearchapi.optionalTimeField import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.io.stream.StreamOutput import org.opensearch.common.io.stream.Writeable diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/action/Throttle.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/action/Throttle.kt index df6816718..177345b44 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/action/Throttle.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/action/Throttle.kt @@ -58,7 +58,7 @@ data class Throttle( xcp.nextToken() when (fieldName) { UNIT_FIELD -> { - val unitString = xcp.text().toUpperCase(Locale.ROOT) + val unitString = xcp.text().uppercase(Locale.ROOT) require(StringUtils.equals(unitString, ChronoUnit.MINUTES.name), { "Only support MINUTES throttle unit currently" }) unit = ChronoUnit.valueOf(unitString) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/destination/Chime.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/destination/Chime.kt index 513f4825e..b79997703 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/destination/Chime.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/destination/Chime.kt @@ -5,7 +5,7 @@ package org.opensearch.alerting.model.destination -import org.opensearch.alerting.elasticapi.string +import org.opensearch.alerting.opensearchapi.string import org.opensearch.common.Strings import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.io.stream.StreamOutput diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/destination/Destination.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/destination/Destination.kt index a03378bec..83e7b12a5 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/destination/Destination.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/destination/Destination.kt @@ -12,11 +12,11 @@ import org.opensearch.alerting.destination.message.ChimeMessage import org.opensearch.alerting.destination.message.CustomWebhookMessage import org.opensearch.alerting.destination.message.EmailMessage import org.opensearch.alerting.destination.message.SlackMessage -import org.opensearch.alerting.elasticapi.convertToMap -import org.opensearch.alerting.elasticapi.instant -import org.opensearch.alerting.elasticapi.optionalTimeField -import org.opensearch.alerting.elasticapi.optionalUserField import org.opensearch.alerting.model.destination.email.Email +import org.opensearch.alerting.opensearchapi.convertToMap +import org.opensearch.alerting.opensearchapi.instant +import org.opensearch.alerting.opensearchapi.optionalTimeField +import org.opensearch.alerting.opensearchapi.optionalUserField import org.opensearch.alerting.util.DestinationType import org.opensearch.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION import org.opensearch.alerting.util.isHostInDenylist @@ -193,7 +193,7 @@ data class Destination( schemaVersion, seqNo, primaryTerm, - DestinationType.valueOf(type.toUpperCase(Locale.ROOT)), + DestinationType.valueOf(type.uppercase(Locale.ROOT)), requireNotNull(name) { "Destination name is null" }, user, lastUpdateTime ?: Instant.now(), diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/destination/Slack.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/destination/Slack.kt index 0341ca598..3d7ea6c9c 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/destination/Slack.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/destination/Slack.kt @@ -5,7 +5,7 @@ package org.opensearch.alerting.model.destination -import org.opensearch.alerting.elasticapi.string +import org.opensearch.alerting.opensearchapi.string import org.opensearch.common.Strings import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.io.stream.StreamOutput diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/destination/email/Email.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/destination/email/Email.kt index 1968435f1..418172984 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/destination/email/Email.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/destination/email/Email.kt @@ -173,7 +173,7 @@ data class Recipient( } return Recipient( - RecipientType.valueOf(type.toUpperCase(Locale.ROOT)), + RecipientType.valueOf(type.uppercase(Locale.ROOT)), emailGroupID, email ) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetAlertsAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetAlertsAction.kt index a2063494b..a189c3f52 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetAlertsAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetAlertsAction.kt @@ -68,7 +68,7 @@ class RestGetAlertsAction : BaseRestHandler() { val getAlertsRequest = GetAlertsRequest(table, severityLevel, alertState, monitorId) return RestChannelConsumer { - channel -> + channel -> client.execute(GetAlertsAction.INSTANCE, getAlertsRequest, RestToXContentListener(channel)) } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetDestinationsAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetDestinationsAction.kt index d0f02ce80..d6ee82f71 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetDestinationsAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetDestinationsAction.kt @@ -89,7 +89,7 @@ class RestGetDestinationsAction : BaseRestHandler() { destinationType ) return RestChannelConsumer { - channel -> + channel -> client.execute(GetDestinationsAction.INSTANCE, getDestinationsRequest, RestToXContentListener(channel)) } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetMonitorAction.kt index 3046a8736..4d9bac033 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetMonitorAction.kt @@ -68,7 +68,7 @@ class RestGetMonitorAction : BaseRestHandler() { } val getMonitorRequest = GetMonitorRequest(monitorId, RestActions.parseVersion(request), request.method(), srcContext) return RestChannelConsumer { - channel -> + channel -> client.execute(GetMonitorAction.INSTANCE, getMonitorRequest, RestToXContentListener(channel)) } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestIndexDestinationAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestIndexDestinationAction.kt index 6a790cb98..a3934353a 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestIndexDestinationAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestIndexDestinationAction.kt @@ -86,7 +86,7 @@ class RestIndexDestinationAction : BaseRestHandler() { } val indexDestinationRequest = IndexDestinationRequest(id, seqNo, primaryTerm, refreshPolicy, request.method(), destination) return RestChannelConsumer { - channel -> + channel -> client.execute( IndexDestinationAction.INSTANCE, indexDestinationRequest, indexDestinationResponse(channel, request.method()) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportAcknowledgeAlertAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportAcknowledgeAlertAction.kt index 908b895cd..1dbce69fd 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportAcknowledgeAlertAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportAcknowledgeAlertAction.kt @@ -18,8 +18,8 @@ import org.opensearch.alerting.action.AcknowledgeAlertAction import org.opensearch.alerting.action.AcknowledgeAlertRequest import org.opensearch.alerting.action.AcknowledgeAlertResponse import org.opensearch.alerting.alerts.AlertIndices -import org.opensearch.alerting.elasticapi.optionalTimeField import org.opensearch.alerting.model.Alert +import org.opensearch.alerting.opensearchapi.optionalTimeField import org.opensearch.alerting.util.AlertingException import org.opensearch.client.Client import org.opensearch.common.inject.Inject diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetAlertsAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetAlertsAction.kt index 1df607015..d39fd1100 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetAlertsAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetAlertsAction.kt @@ -15,8 +15,8 @@ import org.opensearch.alerting.action.GetAlertsAction import org.opensearch.alerting.action.GetAlertsRequest import org.opensearch.alerting.action.GetAlertsResponse import org.opensearch.alerting.alerts.AlertIndices -import org.opensearch.alerting.elasticapi.addFilter import org.opensearch.alerting.model.Alert +import org.opensearch.alerting.opensearchapi.addFilter import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.util.AlertingException import org.opensearch.client.Client diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetDestinationsAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetDestinationsAction.kt index cef192700..245a1bd87 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetDestinationsAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetDestinationsAction.kt @@ -15,8 +15,8 @@ import org.opensearch.alerting.action.GetDestinationsAction import org.opensearch.alerting.action.GetDestinationsRequest import org.opensearch.alerting.action.GetDestinationsResponse import org.opensearch.alerting.core.model.ScheduledJob -import org.opensearch.alerting.elasticapi.addFilter import org.opensearch.alerting.model.destination.Destination +import org.opensearch.alerting.opensearchapi.addFilter import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.util.AlertingException import org.opensearch.client.Client diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexDestinationAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexDestinationAction.kt index 8a5d4daef..a41c8c179 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexDestinationAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexDestinationAction.kt @@ -306,7 +306,7 @@ class TransportIndexDestinationAction @Inject constructor( var failureReasons = StringBuilder() if (response.shardInfo.failed > 0) { response.shardInfo.failures.forEach { - entry -> + entry -> failureReasons.append(entry.reason()) } return failureReasons.toString() diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexEmailAccountAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexEmailAccountAction.kt index c1349e3a9..9f68e2584 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexEmailAccountAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexEmailAccountAction.kt @@ -300,7 +300,7 @@ class TransportIndexEmailAccountAction @Inject constructor( val failureReasons = StringBuilder() if (response.shardInfo.failed > 0) { response.shardInfo.failures.forEach { - entry -> + entry -> failureReasons.append(entry.reason()) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexEmailGroupAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexEmailGroupAction.kt index a89e2cab6..5c2d541ed 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexEmailGroupAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexEmailGroupAction.kt @@ -232,7 +232,7 @@ class TransportIndexEmailGroupAction @Inject constructor( val failureReasons = StringBuilder() if (response.shardInfo.failed > 0) { response.shardInfo.failures.forEach { - entry -> + entry -> failureReasons.append(entry.reason()) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt index e4f04770d..50fde1514 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt @@ -477,7 +477,7 @@ class TransportIndexMonitorAction @Inject constructor( val failureReasons = StringBuilder() if (response.shardInfo.failed > 0) { response.shardInfo.failures.forEach { - entry -> + entry -> failureReasons.append(entry.reason()) } return failureReasons.toString() diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchMonitorAction.kt index ed892619b..f97e382ff 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchMonitorAction.kt @@ -13,7 +13,7 @@ import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction import org.opensearch.alerting.action.SearchMonitorAction import org.opensearch.alerting.action.SearchMonitorRequest -import org.opensearch.alerting.elasticapi.addFilter +import org.opensearch.alerting.opensearchapi.addFilter import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.util.AlertingException import org.opensearch.client.Client diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/SupportedClusterMetricsSettingsExtensions.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/SupportedClusterMetricsSettingsExtensions.kt index cb70d578a..2e3027991 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/SupportedClusterMetricsSettingsExtensions.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/SupportedClusterMetricsSettingsExtensions.kt @@ -24,7 +24,7 @@ import org.opensearch.action.admin.indices.recovery.RecoveryRequest import org.opensearch.action.admin.indices.recovery.RecoveryResponse import org.opensearch.alerting.core.model.ClusterMetricsInput import org.opensearch.alerting.core.model.ClusterMetricsInput.ClusterMetricType -import org.opensearch.alerting.elasticapi.convertToMap +import org.opensearch.alerting.opensearchapi.convertToMap import org.opensearch.alerting.settings.SupportedClusterMetricsSettings import org.opensearch.alerting.settings.SupportedClusterMetricsSettings.Companion.resolveToActionRequest import org.opensearch.client.Client diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/destinationmigration/DestinationConversionUtils.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/destinationmigration/DestinationConversionUtils.kt new file mode 100644 index 000000000..3dec5174e --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/destinationmigration/DestinationConversionUtils.kt @@ -0,0 +1,184 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.util.destinationmigration + +import org.apache.http.client.utils.URIBuilder +import org.opensearch.alerting.model.destination.Destination +import org.opensearch.alerting.model.destination.email.EmailAccount +import org.opensearch.alerting.model.destination.email.Recipient +import org.opensearch.alerting.util.DestinationType +import org.opensearch.common.Strings +import org.opensearch.commons.notifications.model.Chime +import org.opensearch.commons.notifications.model.ConfigType +import org.opensearch.commons.notifications.model.Email +import org.opensearch.commons.notifications.model.EmailGroup +import org.opensearch.commons.notifications.model.EmailRecipient +import org.opensearch.commons.notifications.model.HttpMethodType +import org.opensearch.commons.notifications.model.MethodType +import org.opensearch.commons.notifications.model.NotificationConfig +import org.opensearch.commons.notifications.model.Slack +import org.opensearch.commons.notifications.model.SmtpAccount +import org.opensearch.commons.notifications.model.Webhook +import java.net.URI +import java.net.URISyntaxException +import java.util.Locale + +class DestinationConversionUtils { + + companion object { + + fun convertDestinationToNotificationConfig(destination: Destination): NotificationConfig? { + when (destination.type) { + DestinationType.CHIME -> { + val alertChime = destination.chime ?: return null + val chime = Chime(alertChime.url) + val description = "Chime destination created from the Alerting plugin" + return NotificationConfig( + destination.name, + description, + ConfigType.CHIME, + chime + ) + } + DestinationType.SLACK -> { + val alertSlack = destination.slack ?: return null + val slack = Slack(alertSlack.url) + val description = "Slack destination created from the Alerting plugin" + return NotificationConfig( + destination.name, + description, + ConfigType.SLACK, + slack + ) + } + // TODO: Add this back after adding SNS to Destination data models +// DestinationType.SNS -> { +// val alertSNS = destination.sns ?: return null +// val sns = Sns(alertSNS.topicARN, alertSNS.roleARN) +// val description = "SNS destination created from the Alerting plugin" +// return NotificationConfig( +// destination.name, +// description, +// ConfigType.SNS, +// sns +// ) +// } + DestinationType.CUSTOM_WEBHOOK -> { + val alertWebhook = destination.customWebhook ?: return null + val uri = buildUri( + alertWebhook.url, + alertWebhook.scheme, + alertWebhook.host, + alertWebhook.port, + alertWebhook.path, + alertWebhook.queryParams + ).toString() + val methodType = when (alertWebhook.method?.uppercase(Locale.ENGLISH)) { + "POST" -> HttpMethodType.POST + "PUT" -> HttpMethodType.PUT + "PATCH" -> HttpMethodType.PATCH + else -> HttpMethodType.POST + } + val webhook = Webhook(uri, alertWebhook.headerParams, methodType) + val description = "Webhook destination created from the Alerting plugin" + return NotificationConfig( + destination.name, + description, + ConfigType.WEBHOOK, + webhook + ) + } + DestinationType.EMAIL -> { + val alertEmail = destination.email ?: return null + val recipients = mutableListOf() + val emailGroupIds = mutableListOf() + alertEmail.recipients.forEach { + if (it.type == Recipient.RecipientType.EMAIL_GROUP) + it.emailGroupID?.let { emailGroup -> emailGroupIds.add(emailGroup) } + else it.email?.let { emailRecipient -> recipients.add(EmailRecipient(emailRecipient)) } + } + + val email = Email(alertEmail.emailAccountID, recipients, emailGroupIds) + val description = "Email destination created from the Alerting plugin" + return NotificationConfig( + destination.name, + description, + ConfigType.EMAIL, + email + ) + } + else -> return null + } + } + + fun convertEmailAccountToNotificationConfig(emailAccount: EmailAccount): NotificationConfig { + val methodType = convertAlertingToNotificationMethodType(emailAccount.method) + val smtpAccount = SmtpAccount(emailAccount.host, emailAccount.port, methodType, emailAccount.email) + val description = "Email account created from the Alerting plugin" + return NotificationConfig( + emailAccount.name, + description, + ConfigType.SMTP_ACCOUNT, + smtpAccount + ) + } + + fun convertEmailGroupToNotificationConfig( + emailGroup: org.opensearch.alerting.model.destination.email.EmailGroup + ): NotificationConfig { + val recipients = mutableListOf() + emailGroup.emails.forEach { + recipients.add(EmailRecipient(it.email)) + } + val notificationEmailGroup = EmailGroup(recipients) + + val description = "Email group created from the Alerting plugin" + return NotificationConfig( + emailGroup.name, + description, + ConfigType.EMAIL_GROUP, + notificationEmailGroup + ) + } + + private fun buildUri( + endpoint: String?, + scheme: String?, + host: String?, + port: Int, + path: String?, + queryParams: Map + ): URI? { + return try { + if (Strings.isNullOrEmpty(endpoint)) { + if (host == null) { + throw IllegalStateException("No host was provided when endpoint was null") + } + var uriScheme = scheme + if (Strings.isNullOrEmpty(scheme)) { + uriScheme = "https" + } + val uriBuilder = URIBuilder() + if (queryParams.isNotEmpty()) { + for ((key, value) in queryParams) uriBuilder.addParameter(key, value) + } + return uriBuilder.setScheme(uriScheme).setHost(host).setPort(port).setPath(path).build() + } + URIBuilder(endpoint).build() + } catch (e: URISyntaxException) { + throw IllegalStateException("Error creating URI", e) + } + } + + private fun convertAlertingToNotificationMethodType(alertMethodType: EmailAccount.MethodType): MethodType { + return when (alertMethodType) { + EmailAccount.MethodType.NONE -> MethodType.NONE + EmailAccount.MethodType.SSL -> MethodType.SSL + EmailAccount.MethodType.TLS -> MethodType.START_TLS + } + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/destinationmigration/DestinationMigrationCoordinator.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/destinationmigration/DestinationMigrationCoordinator.kt new file mode 100644 index 000000000..a806e3aeb --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/destinationmigration/DestinationMigrationCoordinator.kt @@ -0,0 +1,98 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.util.destinationmigration + +import kotlinx.coroutines.CoroutineName +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import org.apache.logging.log4j.LogManager +import org.opensearch.client.Client +import org.opensearch.client.node.NodeClient +import org.opensearch.cluster.ClusterChangedEvent +import org.opensearch.cluster.ClusterStateListener +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.component.LifecycleListener +import org.opensearch.common.unit.TimeValue +import org.opensearch.threadpool.Scheduler +import org.opensearch.threadpool.ThreadPool +import kotlin.coroutines.CoroutineContext + +class DestinationMigrationCoordinator( + private val client: Client, + private val clusterService: ClusterService, + private val threadPool: ThreadPool +) : ClusterStateListener, CoroutineScope, LifecycleListener() { + + private val logger = LogManager.getLogger(javaClass) + + override val coroutineContext: CoroutineContext + get() = Dispatchers.Default + CoroutineName("DestinationMigrationCoordinator") + + private var scheduledMigration: Scheduler.Cancellable? = null + + @Volatile + private var runningLock = false + + init { + clusterService.addListener(this) + clusterService.addLifecycleListener(this) + } + + override fun clusterChanged(event: ClusterChangedEvent) { + logger.info("Detected cluster change event for destination migration") + if (DestinationMigrationUtilService.finishFlag) { + logger.info("Reset destination migration process.") + scheduledMigration?.cancel() + DestinationMigrationUtilService.finishFlag = false + } + if ( + event.localNodeMaster() && + !runningLock && + (scheduledMigration == null || scheduledMigration!!.isCancelled) + ) { + try { + runningLock = true + initMigrateDestinations() + } finally { + runningLock = false + } + } else if (!event.localNodeMaster()) { + scheduledMigration?.cancel() + } + } + + private fun initMigrateDestinations() { + if (!clusterService.state().nodes().isLocalNodeElectedMaster) { + scheduledMigration?.cancel() + return + } + + if (DestinationMigrationUtilService.finishFlag) { + logger.info("Destination migration is already complete, cancelling migration process.") + scheduledMigration?.cancel() + return + } + + val scheduledJob = Runnable { + launch { + try { + if (DestinationMigrationUtilService.finishFlag) { + logger.info("Cancel background destination migration process.") + scheduledMigration?.cancel() + } + + logger.info("Performing migration of destination data.") + DestinationMigrationUtilService.migrateDestinations(client as NodeClient) + } catch (e: Exception) { + logger.error("Failed to migrate destination data", e) + } + } + } + + scheduledMigration = threadPool.scheduleWithFixedDelay(scheduledJob, TimeValue.timeValueMinutes(1), ThreadPool.Names.MANAGEMENT) + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/destinationmigration/DestinationMigrationUtilService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/destinationmigration/DestinationMigrationUtilService.kt new file mode 100644 index 000000000..b225a1e93 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/destinationmigration/DestinationMigrationUtilService.kt @@ -0,0 +1,221 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.util.destinationmigration + +import org.apache.logging.log4j.LogManager +import org.opensearch.action.bulk.BulkRequest +import org.opensearch.action.bulk.BulkResponse +import org.opensearch.action.delete.DeleteRequest +import org.opensearch.action.search.SearchRequest +import org.opensearch.action.search.SearchResponse +import org.opensearch.action.support.WriteRequest +import org.opensearch.alerting.core.model.ScheduledJob +import org.opensearch.alerting.model.destination.Destination +import org.opensearch.alerting.model.destination.email.EmailAccount +import org.opensearch.alerting.model.destination.email.EmailGroup +import org.opensearch.alerting.opensearchapi.suspendUntil +import org.opensearch.alerting.util.destinationmigration.DestinationConversionUtils.Companion.convertDestinationToNotificationConfig +import org.opensearch.alerting.util.destinationmigration.DestinationConversionUtils.Companion.convertEmailAccountToNotificationConfig +import org.opensearch.alerting.util.destinationmigration.DestinationConversionUtils.Companion.convertEmailGroupToNotificationConfig +import org.opensearch.alerting.util.destinationmigration.NotificationApiUtils.Companion.createNotificationConfig +import org.opensearch.client.node.NodeClient +import org.opensearch.common.Strings +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.NamedXContentRegistry +import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.common.xcontent.XContentParser +import org.opensearch.common.xcontent.XContentParserUtils +import org.opensearch.common.xcontent.XContentType +import org.opensearch.commons.ConfigConstants +import org.opensearch.commons.notifications.action.CreateNotificationConfigRequest +import org.opensearch.commons.notifications.model.NotificationConfig +import org.opensearch.commons.notifications.model.NotificationConfigInfo +import org.opensearch.index.query.QueryBuilders +import org.opensearch.rest.RestStatus +import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.search.fetch.subphase.FetchSourceContext +import java.time.Instant + +class DestinationMigrationUtilService { + + companion object { + + private val logger = LogManager.getLogger(DestinationMigrationUtilService::class) + + @Volatile + private var runningLock = false // In case 2 migrateDestinations() processes are running + + // Used in DestinationMigrationCoordinator to cancel scheduled process + @Volatile + var finishFlag = false + internal set + + suspend fun migrateDestinations(client: NodeClient) { + if (runningLock) { + logger.info("There is already a migrate destination process running...") + return + } else if (finishFlag) { + logger.info("Destination migration has finished.") + return + } + try { + runningLock = true + + val destinationsToMigrate = retrieveDestinationsToMigrate(client) + logger.info("Need to migrate ${destinationsToMigrate.size} destinations") + if (destinationsToMigrate.isEmpty()) { + finishFlag = true + runningLock = false + return + } + val migratedDestinations = createNotificationChannelIfNotExists(client, destinationsToMigrate) + logger.info("Migrated ${migratedDestinations.size} destinations") + val failedDeletedDestinations = deleteOldDestinations(client, migratedDestinations) + logger.info("Failed to delete ${failedDeletedDestinations.size} destinations from migration process cleanup") + } finally { + runningLock = false + } + } + + private suspend fun deleteOldDestinations(client: NodeClient, destinationIds: List): List { + val bulkDeleteRequest = BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + destinationIds.forEach { + val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, it) + bulkDeleteRequest.add(deleteRequest) + } + + val failedToDeleteDestinations = mutableListOf() + try { + val bulkResponse: BulkResponse = client.suspendUntil { client.bulk(bulkDeleteRequest, it) } + failedToDeleteDestinations.addAll(bulkResponse.items.filter { it.isFailed }.map { it.id }) + } catch (e: Exception) { + logger.error("Failed to delete all destinations", e) + failedToDeleteDestinations.addAll(destinationIds) + } + return failedToDeleteDestinations + } + + private suspend fun createNotificationChannelIfNotExists( + client: NodeClient, + notificationConfigInfoList: List> + ): List { + val migratedNotificationConfigs = mutableListOf() + notificationConfigInfoList.forEach { + val notificationConfigInfo = it.first + val userStr = it.second + val createNotificationConfigRequest = CreateNotificationConfigRequest( + notificationConfigInfo.notificationConfig, + notificationConfigInfo.configId + ) + try { + // TODO: recreate user object to pass along the same permissions. Make sure this works when user based security is removed + client.threadPool().threadContext.stashContext().use { + if (userStr.isNotBlank()) { + client.threadPool().threadContext + .putTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT, userStr) + } + val createResponse = createNotificationConfig(client, createNotificationConfigRequest) + migratedNotificationConfigs.add(createResponse.configId) + logger.debug(("Migrated destination: ${createResponse.configId}")) + } + } catch (e: Exception) { + if (e.message?.contains("version conflict, document already exists") == true) { + migratedNotificationConfigs.add(notificationConfigInfo.configId) + } else { + logger.warn( + "Failed to migrate over Destination ${notificationConfigInfo.configId} because failed to " + + "create channel in Notification plugin.", + e + ) + } + } + } + return migratedNotificationConfigs + } + + private suspend fun retrieveDestinationsToMigrate(client: NodeClient): List> { + var start = 0 + val size = 100 + val notificationConfigInfoList = mutableListOf>() + var hasMoreResults = true + + while (hasMoreResults) { + val searchSourceBuilder = SearchSourceBuilder() + .size(size) + .from(start) + .fetchSource(FetchSourceContext(true, Strings.EMPTY_ARRAY, Strings.EMPTY_ARRAY)) + .seqNoAndPrimaryTerm(true) + .version(true) + val queryBuilder = QueryBuilders.boolQuery() + .should(QueryBuilders.existsQuery("email_account")) + .should(QueryBuilders.existsQuery("email_group")) + .should(QueryBuilders.existsQuery("destination")) + searchSourceBuilder.query(queryBuilder) + + val searchRequest = SearchRequest() + .source(searchSourceBuilder) + .indices(ScheduledJob.SCHEDULED_JOBS_INDEX) + val response: SearchResponse = client.suspendUntil { client.search(searchRequest, it) } + + if (response.status() != RestStatus.OK) { + logger.error("Failed to retrieve destinations to migrate") + hasMoreResults = false + } else { + if (response.hits.hits.isEmpty()) { + hasMoreResults = false + } + for (hit in response.hits) { + val xcp = XContentFactory.xContent(XContentType.JSON) + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, hit.sourceAsString) + var notificationConfig: NotificationConfig? + var userStr = "" + when { + hit.sourceAsString.contains("\"email_group\"") -> { + val emailGroup = EmailGroup.parseWithType(xcp, hit.id, hit.version) + notificationConfig = convertEmailGroupToNotificationConfig(emailGroup) + } + hit.sourceAsString.contains("\"email_account\"") -> { + val emailAccount = EmailAccount.parseWithType(xcp, hit.id, hit.version) + notificationConfig = convertEmailAccountToNotificationConfig(emailAccount) + } + else -> { + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.FIELD_NAME, xcp.nextToken(), xcp) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) + val destination = Destination.parse( + xcp, + hit.id, + hit.version, + hit.seqNo.toInt(), + hit.primaryTerm.toInt() + ) + userStr = destination.user.toString() + notificationConfig = convertDestinationToNotificationConfig(destination) + } + } + + if (notificationConfig != null) + notificationConfigInfoList.add( + Pair( + NotificationConfigInfo( + hit.id, + Instant.now(), + Instant.now(), + notificationConfig + ), + userStr + ) + ) + } + } + + start += size + } + + return notificationConfigInfoList + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/destinationmigration/NotificationApiUtils.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/destinationmigration/NotificationApiUtils.kt new file mode 100644 index 000000000..225714da2 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/destinationmigration/NotificationApiUtils.kt @@ -0,0 +1,153 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.util.destinationmigration + +import org.apache.logging.log4j.LogManager +import org.opensearch.action.bulk.BackoffPolicy +import org.opensearch.alerting.opensearchapi.retryForNotification +import org.opensearch.alerting.opensearchapi.suspendUntil +import org.opensearch.client.node.NodeClient +import org.opensearch.common.unit.TimeValue +import org.opensearch.commons.ConfigConstants +import org.opensearch.commons.notifications.NotificationsPluginInterface +import org.opensearch.commons.notifications.action.CreateNotificationConfigRequest +import org.opensearch.commons.notifications.action.CreateNotificationConfigResponse +import org.opensearch.commons.notifications.action.DeleteNotificationConfigRequest +import org.opensearch.commons.notifications.action.DeleteNotificationConfigResponse +import org.opensearch.commons.notifications.action.GetNotificationConfigRequest +import org.opensearch.commons.notifications.action.GetNotificationConfigResponse +import org.opensearch.commons.notifications.action.SendNotificationRequest +import org.opensearch.commons.notifications.action.SendNotificationResponse +import org.opensearch.commons.notifications.action.UpdateNotificationConfigRequest +import org.opensearch.commons.notifications.action.UpdateNotificationConfigResponse + +class NotificationApiUtils { + + companion object { + + private val logger = LogManager.getLogger(NotificationApiUtils::class) + + private val defaultRetryPolicy = + BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(100), 2) + + suspend fun getNotificationConfig( + client: NodeClient, + getNotificationConfigRequest: GetNotificationConfigRequest, + retryPolicy: BackoffPolicy = defaultRetryPolicy + ): GetNotificationConfigResponse { + lateinit var getNotificationConfigResponse: GetNotificationConfigResponse + val userStr = client.threadPool().threadContext + .getTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT) + client.threadPool().threadContext.stashContext().use { + client.threadPool().threadContext.putTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT, userStr) + retryPolicy.retryForNotification(logger) { + getNotificationConfigResponse = NotificationsPluginInterface.suspendUntil { + this.getNotificationConfig( + client, + getNotificationConfigRequest, + it + ) + } + } + } + return getNotificationConfigResponse + } + + suspend fun createNotificationConfig( + client: NodeClient, + createNotificationConfigRequest: CreateNotificationConfigRequest, + retryPolicy: BackoffPolicy = defaultRetryPolicy + ): CreateNotificationConfigResponse { + lateinit var createNotificationConfigResponse: CreateNotificationConfigResponse + val userStr = client.threadPool().threadContext + .getTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT) + client.threadPool().threadContext.stashContext().use { + client.threadPool().threadContext.putTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT, userStr) + retryPolicy.retryForNotification(logger) { + createNotificationConfigResponse = NotificationsPluginInterface.suspendUntil { + this.createNotificationConfig( + client, + createNotificationConfigRequest, + it + ) + } + } + } + return createNotificationConfigResponse + } + + suspend fun updateNotificationConfig( + client: NodeClient, + updateNotificationConfigRequest: UpdateNotificationConfigRequest, + retryPolicy: BackoffPolicy = defaultRetryPolicy + ): UpdateNotificationConfigResponse { + lateinit var updateNotificationConfigResponse: UpdateNotificationConfigResponse + val userStr = client.threadPool() + .threadContext.getTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT) + client.threadPool().threadContext.stashContext().use { + client.threadPool().threadContext.putTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT, userStr) + retryPolicy.retryForNotification(logger) { + updateNotificationConfigResponse = NotificationsPluginInterface.suspendUntil { + this.updateNotificationConfig( + client, + updateNotificationConfigRequest, + it + ) + } + } + } + return updateNotificationConfigResponse + } + + suspend fun deleteNotificationConfig( + client: NodeClient, + deleteNotificationConfigRequest: DeleteNotificationConfigRequest, + retryPolicy: BackoffPolicy = defaultRetryPolicy + ): DeleteNotificationConfigResponse { + lateinit var deleteNotificationConfigResponse: DeleteNotificationConfigResponse + val userStr = client.threadPool() + .threadContext.getTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT) + client.threadPool().threadContext.stashContext().use { + client.threadPool().threadContext.putTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT, userStr) + retryPolicy.retryForNotification(logger) { + deleteNotificationConfigResponse = NotificationsPluginInterface.suspendUntil { + this.deleteNotificationConfig( + client, + deleteNotificationConfigRequest, + it + ) + } + } + } + return deleteNotificationConfigResponse + } + + suspend fun sendNotification( + client: NodeClient, + sendNotificationRequest: SendNotificationRequest, + retryPolicy: BackoffPolicy = defaultRetryPolicy + ): SendNotificationResponse { + lateinit var sendNotificationResponse: SendNotificationResponse + val userStr = client.threadPool().threadContext + .getTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT) + client.threadPool().threadContext.stashContext().use { + client.threadPool().threadContext.putTransient(ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT, userStr) + retryPolicy.retryForNotification(logger) { + sendNotificationResponse = NotificationsPluginInterface.suspendUntil { + this.sendNotification( + client, + sendNotificationRequest.eventSource, + sendNotificationRequest.channelMessage, + sendNotificationRequest.channelIds, + it + ) + } + } + } + return sendNotificationResponse + } + } +} diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt index a12f187b1..d46325a50 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt @@ -20,14 +20,17 @@ import org.opensearch.alerting.alerts.AlertIndices import org.opensearch.alerting.core.model.ScheduledJob import org.opensearch.alerting.core.model.SearchInput import org.opensearch.alerting.core.settings.ScheduledJobSettings -import org.opensearch.alerting.elasticapi.string import org.opensearch.alerting.model.Alert import org.opensearch.alerting.model.BucketLevelTrigger import org.opensearch.alerting.model.Monitor import org.opensearch.alerting.model.QueryLevelTrigger +import org.opensearch.alerting.model.destination.Chime +import org.opensearch.alerting.model.destination.CustomWebhook import org.opensearch.alerting.model.destination.Destination +import org.opensearch.alerting.model.destination.Slack import org.opensearch.alerting.model.destination.email.EmailAccount import org.opensearch.alerting.model.destination.email.EmailGroup +import org.opensearch.alerting.opensearchapi.string import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.settings.DestinationSettings import org.opensearch.alerting.util.DestinationType @@ -363,19 +366,59 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { ) } - protected fun getRandomDestination(salt: String): Destination { + fun getSlackDestination(): Destination { + val slack = Slack("https://hooks.slack.com/services/slackId") return Destination( - type = DestinationType.TEST_ACTION, - name = salt + "test", + type = DestinationType.SLACK, + name = "test", user = randomUser(), lastUpdateTime = Instant.now(), chime = null, + slack = slack, + customWebhook = null, + email = null + ) + } + + fun getChimeDestination(): Destination { + val chime = Chime("https://hooks.chime.aws/incomingwebhooks/chimeId") + return Destination( + type = DestinationType.CHIME, + name = "test", + user = randomUser(), + lastUpdateTime = Instant.now(), + chime = chime, slack = null, customWebhook = null, email = null ) } + fun getCustomWebhookDestination(): Destination { + val customWebhook = CustomWebhook( + "https://hooks.slack.com/services/customWebhookId", + null, + null, + 80, + null, + null, + emptyMap(), + emptyMap(), + null, + null + ) + return Destination( + type = DestinationType.CUSTOM_WEBHOOK, + name = "test", + user = randomUser(), + lastUpdateTime = Instant.now(), + chime = null, + slack = null, + customWebhook = customWebhook, + email = null + ) + } + private fun getTestEmailAccount(): EmailAccount { return EmailAccount( name = "test", @@ -670,7 +713,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { return StringEntity(toJsonString(), APPLICATION_JSON) } - private fun Destination.toJsonString(): String { + protected fun Destination.toJsonString(): String { val builder = XContentFactory.jsonBuilder() return shuffleXContent(toXContent(builder)).string() } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerIT.kt index 7ff48d4cc..27dd73647 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerIT.kt @@ -1007,7 +1007,7 @@ class MonitorRunnerIT : AlertingRestTestCase() { val output = entityAsMap(response) @Suppress("UNCHECKED_CAST") (output["trigger_results"] as HashMap).forEach { - _, v -> + _, v -> assertTrue((v as HashMap)["triggered"] as Boolean) } assertEquals(monitor.name, output["monitor_name"]) @@ -1037,7 +1037,7 @@ class MonitorRunnerIT : AlertingRestTestCase() { val output = entityAsMap(response) @Suppress("UNCHECKED_CAST") (output["trigger_results"] as HashMap).forEach { - _, v -> + _, v -> assertTrue((v as HashMap)["triggered"] as Boolean) } assertEquals(monitor.name, output["monitor_name"]) @@ -1064,7 +1064,7 @@ class MonitorRunnerIT : AlertingRestTestCase() { val output = entityAsMap(response) @Suppress("UNCHECKED_CAST") (output["trigger_results"] as HashMap).forEach { - _, v -> + _, v -> assertTrue((v as HashMap)["triggered"] as Boolean) } @Suppress("UNCHECKED_CAST") @@ -1093,7 +1093,7 @@ class MonitorRunnerIT : AlertingRestTestCase() { val output = entityAsMap(response) @Suppress("UNCHECKED_CAST") (output["trigger_results"] as HashMap).forEach { - _, v -> + _, v -> assertFalse((v as HashMap)["triggered"] as Boolean) } @Suppress("UNCHECKED_CAST") diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/ODFERestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/ODFERestTestCase.kt index 710ccbb60..222637d6f 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/ODFERestTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/ODFERestTestCase.kt @@ -45,6 +45,21 @@ abstract class ODFERestTestCase : OpenSearchRestTestCase() { return System.getProperty("security", "false")!!.toBoolean() } + @Suppress("UNCHECKED_CAST") + fun isNotificationPluginInstalled(): Boolean { + val response = entityAsMap(client().makeRequest("GET", "_nodes/plugins")) + val nodesInfo = response["nodes"] as Map> + for (nodeInfo in nodesInfo.values) { + val plugins = nodeInfo["plugins"] as List> + for (plugin in plugins) { + if (plugin["name"] == "opensearch-notifications") { + return true + } + } + } + return false + } + override fun getProtocol(): String { return if (isHttps()) { "https" diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt index f16ef1c40..8955d0766 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt @@ -15,7 +15,6 @@ import org.opensearch.alerting.core.model.Input import org.opensearch.alerting.core.model.IntervalSchedule import org.opensearch.alerting.core.model.Schedule import org.opensearch.alerting.core.model.SearchInput -import org.opensearch.alerting.elasticapi.string import org.opensearch.alerting.model.ActionExecutionResult import org.opensearch.alerting.model.ActionRunResult import org.opensearch.alerting.model.AggregationResultBucket @@ -38,6 +37,7 @@ import org.opensearch.alerting.model.action.Throttle import org.opensearch.alerting.model.destination.email.EmailAccount import org.opensearch.alerting.model.destination.email.EmailEntry import org.opensearch.alerting.model.destination.email.EmailGroup +import org.opensearch.alerting.opensearchapi.string import org.opensearch.alerting.util.getBucketKeysHash import org.opensearch.client.Request import org.opensearch.client.RequestOptions diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/action/GetAlertsResponseTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/action/GetAlertsResponseTests.kt index d4280daaa..5d29f1107 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/action/GetAlertsResponseTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/action/GetAlertsResponseTests.kt @@ -7,8 +7,8 @@ package org.opensearch.alerting.action import org.junit.Assert import org.opensearch.alerting.builder -import org.opensearch.alerting.elasticapi.string import org.opensearch.alerting.model.Alert +import org.opensearch.alerting.opensearchapi.string import org.opensearch.alerting.randomUser import org.opensearch.common.io.stream.BytesStreamOutput import org.opensearch.common.io.stream.StreamInput diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/model/XContentTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/model/XContentTests.kt index 0f5c5969b..edee16c5f 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/model/XContentTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/model/XContentTests.kt @@ -7,13 +7,13 @@ package org.opensearch.alerting.model import org.opensearch.alerting.builder import org.opensearch.alerting.core.model.SearchInput -import org.opensearch.alerting.elasticapi.string import org.opensearch.alerting.model.action.Action import org.opensearch.alerting.model.action.ActionExecutionPolicy import org.opensearch.alerting.model.action.PerExecutionActionScope import org.opensearch.alerting.model.action.Throttle import org.opensearch.alerting.model.destination.email.EmailAccount import org.opensearch.alerting.model.destination.email.EmailGroup +import org.opensearch.alerting.opensearchapi.string import org.opensearch.alerting.parser import org.opensearch.alerting.randomAction import org.opensearch.alerting.randomActionExecutionPolicy diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/util/destinationmigration/DestinationMigrationUtilServiceIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/util/destinationmigration/DestinationMigrationUtilServiceIT.kt new file mode 100644 index 000000000..b8d6de389 --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/util/destinationmigration/DestinationMigrationUtilServiceIT.kt @@ -0,0 +1,105 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.util.destinationmigration + +import org.opensearch.alerting.AlertingRestTestCase +import org.opensearch.alerting.core.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX +import org.opensearch.alerting.makeRequest +import org.opensearch.alerting.model.destination.Destination +import org.opensearch.alerting.model.destination.email.Email +import org.opensearch.alerting.model.destination.email.EmailAccount +import org.opensearch.alerting.model.destination.email.EmailEntry +import org.opensearch.alerting.model.destination.email.EmailGroup +import org.opensearch.alerting.model.destination.email.Recipient +import org.opensearch.alerting.randomUser +import org.opensearch.alerting.toJsonString +import org.opensearch.alerting.util.DestinationType +import org.opensearch.client.ResponseException +import org.opensearch.rest.RestStatus +import java.time.Instant +import java.util.UUID + +class DestinationMigrationUtilServiceIT : AlertingRestTestCase() { + + fun `test migrateData`() { + if (isNotificationPluginInstalled()) { + // Create alerting config index + createRandomMonitor() + + val emailAccount = EmailAccount( + name = "test", + email = "test@email.com", + host = "smtp.com", + port = 25, + method = EmailAccount.MethodType.NONE, + username = null, + password = null + ) + val emailAccountDoc = "{\"email_account\" : ${emailAccount.toJsonString()}}" + val emailGroup = EmailGroup( + name = "test", + emails = listOf(EmailEntry("test@email.com")) + ) + val emailGroupDoc = "{\"email_group\" : ${emailGroup.toJsonString()}}" + val emailAccountId = UUID.randomUUID().toString() + val emailGroupId = UUID.randomUUID().toString() + indexDoc(SCHEDULED_JOBS_INDEX, emailAccountId, emailAccountDoc) + indexDoc(SCHEDULED_JOBS_INDEX, emailGroupId, emailGroupDoc) + + val recipient = Recipient(Recipient.RecipientType.EMAIL, null, "test@email.com") + val email = Email(emailAccountId, listOf(recipient)) + val emailDest = Destination( + id = UUID.randomUUID().toString(), + type = DestinationType.EMAIL, + name = "test", + user = randomUser(), + lastUpdateTime = Instant.now(), + chime = null, + slack = null, + customWebhook = null, + email = email + ) + val slackDestination = getSlackDestination().copy(id = UUID.randomUUID().toString()) + val chimeDestination = getChimeDestination().copy(id = UUID.randomUUID().toString()) + val customWebhookDestination = getCustomWebhookDestination().copy(id = UUID.randomUUID().toString()) + + val destinations = listOf(emailDest, slackDestination, chimeDestination, customWebhookDestination) + + val ids = mutableListOf(emailAccountId, emailGroupId) + for (destination in destinations) { + val dest = """ + { + "destination" : ${destination.toJsonString()} + } + """.trimIndent() + indexDoc(SCHEDULED_JOBS_INDEX, destination.id, dest) + ids.add(destination.id) + } + + // Create cluster change event and wait for migration service to complete migrating data over + client().updateSettings("indices.recovery.max_bytes_per_sec", "40mb") + Thread.sleep(120000) + + for (id in ids) { + val response = client().makeRequest( + "GET", + "_plugins/_notifications/configs/$id" + ) + assertEquals(RestStatus.OK, response.restStatus()) + + try { + client().makeRequest( + "GET", + ".opendistro-alerting-config/_doc/$id" + ) + fail("Expecting ResponseException") + } catch (e: ResponseException) { + assertEquals(RestStatus.NOT_FOUND, e.response.restStatus()) + } + } + } + } +} diff --git a/alerting/src/test/resources/notifications-core/opensearch-notifications-core-2.0.0.0-alpha1-SNAPSHOT.zip b/alerting/src/test/resources/notifications-core/opensearch-notifications-core-2.0.0.0-alpha1-SNAPSHOT.zip new file mode 100644 index 000000000..3f05bb997 Binary files /dev/null and b/alerting/src/test/resources/notifications-core/opensearch-notifications-core-2.0.0.0-alpha1-SNAPSHOT.zip differ diff --git a/alerting/src/test/resources/notifications/opensearch-notifications-2.0.0.0-alpha1-SNAPSHOT.zip b/alerting/src/test/resources/notifications/opensearch-notifications-2.0.0.0-alpha1-SNAPSHOT.zip new file mode 100644 index 000000000..a6a63b807 Binary files /dev/null and b/alerting/src/test/resources/notifications/opensearch-notifications-2.0.0.0-alpha1-SNAPSHOT.zip differ diff --git a/build.gradle b/build.gradle index 14baeec5a..581cc01b5 100644 --- a/build.gradle +++ b/build.gradle @@ -8,7 +8,7 @@ buildscript { ext { opensearch_version = System.getProperty("opensearch.version", "2.0.0-alpha1-SNAPSHOT") - buildVersionQualifier = System.getProperty("build.version_qualifier") + buildVersionQualifier = System.getProperty("build.version_qualifier", "alpha1") isSnapshot = "true" == System.getProperty("build.snapshot", "true") // 2.0.0-alpha1-SNAPSHOT -> 2.0.0.0-alpha1-SNAPSHOT version_tokens = opensearch_version.tokenize('-') @@ -49,7 +49,7 @@ configurations { } dependencies { - add("ktlint", "com.pinterest:ktlint:0.41.0") { + add("ktlint", "com.pinterest:ktlint:0.45.1") { attributes { attribute(Bundling.BUNDLING_ATTRIBUTE, objects.named(Bundling, Bundling.EXTERNAL)) } diff --git a/core/src/main/kotlin/org/opensearch/alerting/core/JobSweeper.kt b/core/src/main/kotlin/org/opensearch/alerting/core/JobSweeper.kt index daae181e8..d778f31e6 100644 --- a/core/src/main/kotlin/org/opensearch/alerting/core/JobSweeper.kt +++ b/core/src/main/kotlin/org/opensearch/alerting/core/JobSweeper.kt @@ -16,8 +16,8 @@ import org.opensearch.alerting.core.settings.ScheduledJobSettings.Companion.SWEE import org.opensearch.alerting.core.settings.ScheduledJobSettings.Companion.SWEEP_BACKOFF_RETRY_COUNT import org.opensearch.alerting.core.settings.ScheduledJobSettings.Companion.SWEEP_PAGE_SIZE import org.opensearch.alerting.core.settings.ScheduledJobSettings.Companion.SWEEP_PERIOD -import org.opensearch.alerting.elasticapi.firstFailureOrNull -import org.opensearch.alerting.elasticapi.retry +import org.opensearch.alerting.opensearchapi.firstFailureOrNull +import org.opensearch.alerting.opensearchapi.retry import org.opensearch.client.Client import org.opensearch.cluster.ClusterChangedEvent import org.opensearch.cluster.ClusterStateListener diff --git a/core/src/main/kotlin/org/opensearch/alerting/core/action/node/ScheduledJobStats.kt b/core/src/main/kotlin/org/opensearch/alerting/core/action/node/ScheduledJobStats.kt index 8b14fdfa0..93413d8f7 100644 --- a/core/src/main/kotlin/org/opensearch/alerting/core/action/node/ScheduledJobStats.kt +++ b/core/src/main/kotlin/org/opensearch/alerting/core/action/node/ScheduledJobStats.kt @@ -15,6 +15,7 @@ import org.opensearch.common.io.stream.StreamOutput import org.opensearch.common.xcontent.ToXContent import org.opensearch.common.xcontent.ToXContentFragment import org.opensearch.common.xcontent.XContentBuilder +import java.util.Locale /** * Scheduled job stat that will be generated by each node. @@ -66,7 +67,7 @@ class ScheduledJobStats : BaseNodeResponse, ToXContentFragment { override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { builder.field("name", node.name) builder.field("schedule_status", status) - builder.field("roles", node.roles.map { it.roleName().toUpperCase() }) + builder.field("roles", node.roles.map { it.roleName().uppercase(Locale.getDefault()) }) if (jobSweeperMetrics != null) { builder.startObject(RestScheduledJobStatsHandler.JOB_SCHEDULING_METRICS) jobSweeperMetrics!!.toXContent(builder, params) diff --git a/core/src/main/kotlin/org/opensearch/alerting/core/model/Schedule.kt b/core/src/main/kotlin/org/opensearch/alerting/core/model/Schedule.kt index ada904ccc..7867dee07 100644 --- a/core/src/main/kotlin/org/opensearch/alerting/core/model/Schedule.kt +++ b/core/src/main/kotlin/org/opensearch/alerting/core/model/Schedule.kt @@ -25,6 +25,7 @@ import java.time.ZoneId import java.time.ZonedDateTime import java.time.temporal.ChronoUnit import java.time.zone.ZoneRulesException +import java.util.Locale sealed class Schedule : Writeable, ToXContentObject { enum class TYPE { CRON, INTERVAL } @@ -73,7 +74,7 @@ sealed class Schedule : Writeable, ToXContentObject { xcp.nextToken() when (cronFieldName) { INTERVAL_FIELD -> interval = xcp.intValue() - UNIT_FIELD -> unit = ChronoUnit.valueOf(xcp.text().toUpperCase()) + UNIT_FIELD -> unit = ChronoUnit.valueOf(xcp.text().uppercase(Locale.getDefault())) } } } diff --git a/core/src/main/kotlin/org/opensearch/alerting/elasticapi/ElasticExtensions.kt b/core/src/main/kotlin/org/opensearch/alerting/opensearchapi/OpenSearchExtensions.kt similarity index 80% rename from core/src/main/kotlin/org/opensearch/alerting/elasticapi/ElasticExtensions.kt rename to core/src/main/kotlin/org/opensearch/alerting/opensearchapi/OpenSearchExtensions.kt index d6e89654a..a49181292 100644 --- a/core/src/main/kotlin/org/opensearch/alerting/elasticapi/ElasticExtensions.kt +++ b/core/src/main/kotlin/org/opensearch/alerting/opensearchapi/OpenSearchExtensions.kt @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.alerting.elasticapi +package org.opensearch.alerting.opensearchapi import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.ThreadContextElement @@ -28,6 +28,7 @@ import org.opensearch.common.xcontent.XContentParserUtils import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.InjectSecurity import org.opensearch.commons.authuser.User +import org.opensearch.commons.notifications.NotificationsPluginInterface import org.opensearch.index.query.BoolQueryBuilder import org.opensearch.index.query.QueryBuilders import org.opensearch.rest.RestStatus @@ -66,6 +67,36 @@ fun BackoffPolicy.retry(block: () -> T): T { } while (true) } +/** + * Backs off and retries a lambda that makes a request. This retries on any Exception unless it detects the + * Notification plugin is not installed. + * + * @param logger - logger used to log intermediate failures + * @param block - the block of code to retry. This should be a suspend function. + */ +suspend fun BackoffPolicy.retryForNotification( + logger: Logger, + block: suspend () -> T +): T { + val iter = iterator() + do { + try { + return block() + } catch (e: java.lang.Exception) { + val isMissingNotificationPlugin = e.message?.contains("failed to find action") ?: false + if (isMissingNotificationPlugin) { + throw OpenSearchException("Notification plugin is not installed. Please install the Notification plugin.", e) + } else if (iter.hasNext()) { + val backoff = iter.next() + logger.warn("Notification operation failed. Retrying in $backoff.", e) + delay(backoff.millis) + } else { + throw e + } + } + } while (true) +} + /** * Retries the given [block] of code as specified by the receiver [BackoffPolicy], if [block] throws an [OpenSearchException] * that is retriable (502, 503, 504). @@ -162,6 +193,20 @@ suspend fun C.suspendUntil(block: C.(ActionListener }) } +/** + * Converts [NotificationsPluginInterface] methods that take a callback into a kotlin suspending function. + * + * @param block - a block of code that is passed an [ActionListener] that should be passed to the NotificationsPluginInterface API. + */ +suspend fun NotificationsPluginInterface.suspendUntil(block: NotificationsPluginInterface.(ActionListener) -> Unit): T = + suspendCoroutine { cont -> + block(object : ActionListener { + override fun onResponse(response: T) = cont.resume(response) + + override fun onFailure(e: Exception) = cont.resumeWithException(e) + }) + } + /** * Store a [ThreadContext] and restore a [ThreadContext] when the coroutine resumes on a different thread. * diff --git a/core/src/test/kotlin/org/opensearch/alerting/core/XContentTests.kt b/core/src/test/kotlin/org/opensearch/alerting/core/XContentTests.kt index 92b876112..610125469 100644 --- a/core/src/test/kotlin/org/opensearch/alerting/core/XContentTests.kt +++ b/core/src/test/kotlin/org/opensearch/alerting/core/XContentTests.kt @@ -8,7 +8,7 @@ package org.opensearch.alerting.core import org.opensearch.alerting.core.model.Input import org.opensearch.alerting.core.model.SearchInput import org.opensearch.alerting.core.model.XContentTestBase -import org.opensearch.alerting.elasticapi.string +import org.opensearch.alerting.opensearchapi.string import org.opensearch.common.xcontent.ToXContent import org.opensearch.index.query.QueryBuilders import org.opensearch.search.builder.SearchSourceBuilder diff --git a/core/src/test/kotlin/org/opensearch/alerting/core/model/ScheduleTest.kt b/core/src/test/kotlin/org/opensearch/alerting/core/model/ScheduleTest.kt index a0be7a473..53699c36f 100644 --- a/core/src/test/kotlin/org/opensearch/alerting/core/model/ScheduleTest.kt +++ b/core/src/test/kotlin/org/opensearch/alerting/core/model/ScheduleTest.kt @@ -5,7 +5,7 @@ package org.opensearch.alerting.core.model -import org.opensearch.alerting.elasticapi.string +import org.opensearch.alerting.opensearchapi.string import org.opensearch.common.xcontent.ToXContent import java.time.Instant import java.time.ZoneId diff --git a/docs/document-level-alerting-rfc.md b/docs/document-level-alerting-rfc.md new file mode 100644 index 000000000..cbf1d8071 --- /dev/null +++ b/docs/document-level-alerting-rfc.md @@ -0,0 +1,22 @@ +# Document-Level Alerting RFC + +The purpose of this request for comments (RFC) is to introduce our plans to enhance the Alerting plugin with document-level alerting functionality and collect feedback and discuss our plans with the community. This RFC is meant to cover the high-level functionality and does not go into implementation details and architecture. + +## Problem Statement + +Currently, the Alerting plugin does not support a simple way to create Monitors that alert on each document in the index. As well as, there is currently no support to ensure there is no gap or overlap in data being monitored. + +A common use case for alerting on a single document would be for monitoring an index that is ingesting audit log data for a set of servers. In this case, a monitor would be to setup an alert when the IP address in the audit log indicates an IP address that is not allowed to access the servers. The expectation here would be that each time the alert is generated, the user is notified of the problem and they can then view the exact record to investigate the security breach. + +This use case can somewhat be accomplished today through a Query-level or Bucket-level monitors with a query that searches for security breaches in the audit logs. Then the trigger can generate an alert when there are any matches for security breaches and generate an alert. However, the alerts will only give a summarized view of the security breaches and the user is not given the exact records that caused the issue, so they would need to dive into the data to even find the records that caused the security breaches. + +## Proposed Solution + +The proposed solution is to offer more granular alerting by allowing alerts to be created based on each document in the index. This means that there could be multiple alerts even for a single trigger condition, ensuring that every event is captured. + +### Monitoring new data +As we are alerting on each document for Document Level Alerting, we need to ensure each monitor execution only looks at new data as there will then be duplicate alerts. Additionally, between each monitor execution, there can be no gaps in data being monitored as then that would defeat the purpose of this monitoring solution. By supporting this, the monitor executions would no longer be looking at a time range of data, which most query-level and bucket-level monitors do. + +## Providing Feedback + +If you have comments or feedback on our plans for Document-Level Alerting, please comment on the [GitHub issue](https://github.com/opensearch-project/alerting/issues/238) in this project to discuss. diff --git a/scripts/build.sh b/scripts/build.sh index ff5ed4f1f..c2f770a53 100755 --- a/scripts/build.sh +++ b/scripts/build.sh @@ -18,7 +18,7 @@ function usage() { echo -e "-h help" } -while getopts ":h:v:s:o:p:a:" arg; do +while getopts ":h:v:q:s:o:p:a:" arg; do case $arg in h) usage @@ -78,6 +78,6 @@ cp ${distributions}/*.zip ./$OUTPUT/plugins ./gradlew publishShadowPublicationToStagingRepository -Dopensearch.version=$VERSION -Dbuild.version_qualifier=$QUALIFIER -Dbuild.snapshot=$SNAPSHOT mkdir -p $OUTPUT/maven/org/opensearch -cp -r ./build/local-staging-repo/org/opensearch/notification $OUTPUT/maven/org/opensearch/notification +cp -r ./build/local-staging-repo/org/opensearch/. $OUTPUT/maven/org/opensearch