diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 7d87e3936..234be6fbd 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1 +1 @@ -* @lezzago @AWSHurneyt @sbcd90 @eirsep @getsaurabh02 @praveensameneni @qreshi @bowenlan-amzn @rishabhmaurya \ No newline at end of file +* @lezzago @AWSHurneyt @sbcd90 @eirsep @getsaurabh02 @praveensameneni @qreshi @bowenlan-amzn @rishabhmaurya @engechas \ No newline at end of file diff --git a/.github/workflows/security-test-workflow.yml b/.github/workflows/security-test-workflow.yml index 1c5781d38..a096f26a0 100644 --- a/.github/workflows/security-test-workflow.yml +++ b/.github/workflows/security-test-workflow.yml @@ -73,20 +73,20 @@ jobs: if: env.imagePresent == 'true' run: | cd .. - docker run -p 9200:9200 -d -p 9600:9600 -e "discovery.type=single-node" opensearch-alerting:test + docker run -p 9200:9200 -d -p 9600:9600 -e "OPENSEARCH_INITIAL_ADMIN_PASSWORD=myStrongPassword123!" -e "discovery.type=single-node" opensearch-alerting:test sleep 120 - name: Run Alerting Test for security enabled test cases if: env.imagePresent == 'true' run: | - cluster_running=`curl -XGET https://localhost:9200/_cat/plugins -u admin:admin --insecure` + cluster_running=`curl -XGET https://localhost:9200/_cat/plugins -u admin:myStrongPassword123! --insecure` echo $cluster_running - security=`curl -XGET https://localhost:9200/_cat/plugins -u admin:admin --insecure |grep opensearch-security|wc -l` + security=`curl -XGET https://localhost:9200/_cat/plugins -u admin:myStrongPassword123! --insecure |grep opensearch-security|wc -l` echo $security if [ $security -gt 0 ] then echo "Security plugin is available" - ./gradlew :alerting:integTest -Dtests.rest.cluster=localhost:9200 -Dtests.cluster=localhost:9200 -Dtests.clustername=docker-cluster -Dsecurity=true -Dhttps=true -Duser=admin -Dpassword=admin + ./gradlew :alerting:integTest -Dtests.rest.cluster=localhost:9200 -Dtests.cluster=localhost:9200 -Dtests.clustername=docker-cluster -Dsecurity=true -Dhttps=true -Duser=admin -Dpassword=myStrongPassword123! else echo "Security plugin is NOT available skipping this run as tests without security have already been run" fi diff --git a/DEVELOPER_GUIDE.md b/DEVELOPER_GUIDE.md index 4ec964729..38bb99af3 100644 --- a/DEVELOPER_GUIDE.md +++ b/DEVELOPER_GUIDE.md @@ -76,9 +76,9 @@ When launching a cluster using one of the above commands, logs are placed in `al 1. Setup a local opensearch cluster with security plugin. - - `./gradlew :alerting:integTest -Dtests.rest.cluster=localhost:9200 -Dtests.cluster=localhost:9200 -Dtests.clustername=opensearch -Dhttps=true -Dsecurity=true -Duser=admin -Dpassword=admin` + - `./gradlew :alerting:integTest -Dtests.rest.cluster=localhost:9200 -Dtests.cluster=localhost:9200 -Dtests.clustername=opensearch -Dhttps=true -Dsecurity=true -Duser=admin -Dpassword=` - - `./gradlew :alerting:integTest -Dtests.rest.cluster=localhost:9200 -Dtests.cluster=localhost:9200 -Dtests.clustername=opensearch -Dhttps=true -Dsecurity=true -Duser=admin -Dpassword=admin --tests "org.opensearch.alerting.MonitorRunnerIT.test execute monitor returns search result"` + - `./gradlew :alerting:integTest -Dtests.rest.cluster=localhost:9200 -Dtests.cluster=localhost:9200 -Dtests.clustername=opensearch -Dhttps=true -Dsecurity=true -Duser=admin -Dpassword= --tests "org.opensearch.alerting.MonitorRunnerIT.test execute monitor returns search result"` #### Building from the IDE diff --git a/MAINTAINERS.md b/MAINTAINERS.md index bd6a69ce1..89c7bd145 100644 --- a/MAINTAINERS.md +++ b/MAINTAINERS.md @@ -5,7 +5,7 @@ This document contains a list of maintainers in this repo. See [opensearch-proje ## Current Maintainers | Maintainer | GitHub ID | Affiliation | -|----------------------| ------------------------------------------------- | ----------- | +|----------------------| ------------------------------------------------- |-------------| | Ashish Agrawal | [lezzago](https://github.com/lezzago) | Amazon | | Mohammad Qureshi | [qreshi](https://github.com/qreshi) | Amazon | | Bowen Lan | [bowenlan-amzn](https://github.com/bowenlan-amzn) | Amazon | @@ -15,6 +15,7 @@ This document contains a list of maintainers in this repo. See [opensearch-proje | Surya Sashank Nistala | [eirsep](https://github.com/eirsep) | Amazon | | Thomas Hurney | [AWSHurneyt](https://github.com/AWSHurneyt) | Amazon | | Praveen Sameneni | [praveensameneni](https://github.com/praveensameneni) | Amazon | +| Chase Engelbrecht | [engechas](https://github.com/engechas) | Amazon | ## Emeritus diff --git a/alerting/build.gradle b/alerting/build.gradle index 5f6bd9098..f1714cbe3 100644 --- a/alerting/build.gradle +++ b/alerting/build.gradle @@ -113,7 +113,7 @@ dependencies { implementation "org.jetbrains:annotations:13.0" api project(":alerting-core") - implementation "com.github.seancfoley:ipaddress:5.3.3" + implementation "com.github.seancfoley:ipaddress:5.4.1" testImplementation "org.antlr:antlr4-runtime:${versions.antlr4}" testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}" diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt index 4ba28a408..6b820cf36 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt @@ -20,6 +20,7 @@ import org.opensearch.action.support.WriteRequest import org.opensearch.alerting.alerts.AlertIndices import org.opensearch.alerting.model.ActionRunResult import org.opensearch.alerting.model.ChainedAlertTriggerRunResult +import org.opensearch.alerting.model.ClusterMetricsTriggerRunResult import org.opensearch.alerting.model.QueryLevelTriggerRunResult import org.opensearch.alerting.opensearchapi.firstFailureOrNull import org.opensearch.alerting.opensearchapi.retry @@ -190,6 +191,19 @@ class AlertService( ) } + // Including a list of triggered clusters for cluster metrics monitors + var triggeredClusters: MutableList? = null + if (result is ClusterMetricsTriggerRunResult) + result.clusterTriggerResults.forEach { + if (it.triggered) { + // Add an empty list if one isn't already present + if (triggeredClusters.isNullOrEmpty()) triggeredClusters = mutableListOf() + + // Add the cluster to the list of triggered clusters + triggeredClusters!!.add(it.cluster) + } + } + // Merge the alert's error message to the current alert's history val updatedHistory = currentAlert?.errorHistory.update(alertError) return if (alertError == null && !result.triggered) { @@ -199,7 +213,8 @@ class AlertService( errorMessage = null, errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults, - schemaVersion = IndexUtils.alertIndexSchemaVersion + schemaVersion = IndexUtils.alertIndexSchemaVersion, + clusters = triggeredClusters ) } else if (alertError == null && currentAlert?.isAcknowledged() == true) { null @@ -212,6 +227,7 @@ class AlertService( errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults, schemaVersion = IndexUtils.alertIndexSchemaVersion, + clusters = triggeredClusters ) } else { val alertState = if (workflorwRunContext?.auditDelegateMonitorAlerts == true) { @@ -223,7 +239,8 @@ class AlertService( lastNotificationTime = currentTime, state = alertState, errorMessage = alertError?.message, errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults, schemaVersion = IndexUtils.alertIndexSchemaVersion, executionId = executionId, - workflowId = workflorwRunContext?.workflowId ?: "" + workflowId = workflorwRunContext?.workflowId ?: "", + clusters = triggeredClusters ) } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index e53187c7b..9df20dbc5 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -11,6 +11,7 @@ import org.opensearch.alerting.action.ExecuteWorkflowAction import org.opensearch.alerting.action.GetDestinationsAction import org.opensearch.alerting.action.GetEmailAccountAction import org.opensearch.alerting.action.GetEmailGroupAction +import org.opensearch.alerting.action.GetRemoteIndexesAction import org.opensearch.alerting.action.SearchEmailAccountAction import org.opensearch.alerting.action.SearchEmailGroupAction import org.opensearch.alerting.alerts.AlertIndices @@ -34,6 +35,7 @@ import org.opensearch.alerting.resthandler.RestGetEmailAccountAction import org.opensearch.alerting.resthandler.RestGetEmailGroupAction import org.opensearch.alerting.resthandler.RestGetFindingsAction import org.opensearch.alerting.resthandler.RestGetMonitorAction +import org.opensearch.alerting.resthandler.RestGetRemoteIndexesAction import org.opensearch.alerting.resthandler.RestGetWorkflowAction import org.opensearch.alerting.resthandler.RestGetWorkflowAlertsAction import org.opensearch.alerting.resthandler.RestIndexMonitorAction @@ -59,6 +61,7 @@ import org.opensearch.alerting.transport.TransportGetEmailAccountAction import org.opensearch.alerting.transport.TransportGetEmailGroupAction import org.opensearch.alerting.transport.TransportGetFindingsSearchAction import org.opensearch.alerting.transport.TransportGetMonitorAction +import org.opensearch.alerting.transport.TransportGetRemoteIndexesAction import org.opensearch.alerting.transport.TransportGetWorkflowAction import org.opensearch.alerting.transport.TransportGetWorkflowAlertsAction import org.opensearch.alerting.transport.TransportIndexMonitorAction @@ -133,6 +136,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R @JvmField val UI_METADATA_EXCLUDE = arrayOf("monitor.${Monitor.UI_METADATA_FIELD}") @JvmField val MONITOR_BASE_URI = "/_plugins/_alerting/monitors" @JvmField val WORKFLOW_BASE_URI = "/_plugins/_alerting/workflows" + @JvmField val REMOTE_BASE_URI = "/_plugins/_alerting/remote" @JvmField val DESTINATION_BASE_URI = "/_plugins/_alerting/destinations" @JvmField val LEGACY_OPENDISTRO_MONITOR_BASE_URI = "/_opendistro/_alerting/monitors" @JvmField val LEGACY_OPENDISTRO_DESTINATION_BASE_URI = "/_opendistro/_alerting/destinations" @@ -184,7 +188,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R RestGetWorkflowAlertsAction(), RestGetFindingsAction(), RestGetWorkflowAction(), - RestDeleteWorkflowAction() + RestDeleteWorkflowAction(), + RestGetRemoteIndexesAction(), ) } @@ -211,7 +216,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R ActionPlugin.ActionHandler(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, TransportIndexWorkflowAction::class.java), ActionPlugin.ActionHandler(AlertingActions.GET_WORKFLOW_ACTION_TYPE, TransportGetWorkflowAction::class.java), ActionPlugin.ActionHandler(AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, TransportDeleteWorkflowAction::class.java), - ActionPlugin.ActionHandler(ExecuteWorkflowAction.INSTANCE, TransportExecuteWorkflowAction::class.java) + ActionPlugin.ActionHandler(ExecuteWorkflowAction.INSTANCE, TransportExecuteWorkflowAction::class.java), + ActionPlugin.ActionHandler(GetRemoteIndexesAction.INSTANCE, TransportGetRemoteIndexesAction::class.java), ) } @@ -351,7 +357,9 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R AlertingSettings.FINDING_HISTORY_MAX_DOCS, AlertingSettings.FINDING_HISTORY_INDEX_MAX_AGE, AlertingSettings.FINDING_HISTORY_ROLLOVER_PERIOD, - AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD + AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD, + AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE, + AlertingSettings.REMOTE_MONITORING_ENABLED ) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index daeb22945..1d9e8e9da 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -9,13 +9,14 @@ import org.apache.logging.log4j.LogManager import org.opensearch.ExceptionsHelper import org.opensearch.OpenSearchStatusException import org.opensearch.action.DocWriteRequest +import org.opensearch.action.admin.indices.refresh.RefreshAction +import org.opensearch.action.admin.indices.refresh.RefreshRequest import org.opensearch.action.bulk.BulkRequest import org.opensearch.action.bulk.BulkResponse import org.opensearch.action.index.IndexRequest import org.opensearch.action.search.SearchAction import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse -import org.opensearch.action.support.WriteRequest import org.opensearch.alerting.model.DocumentLevelTriggerRunResult import org.opensearch.alerting.model.IndexExecutionContext import org.opensearch.alerting.model.InputRunResults @@ -322,7 +323,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() { // If there are no triggers defined, we still want to generate findings if (monitor.triggers.isEmpty()) { if (dryrun == false && monitor.id != Monitor.NO_ID) { - logger.error("PERF_DEBUG: Creating ${docsToQueries.size} findings for monitor ${monitor.id}") createFindings(monitor, monitorCtx, docsToQueries, idQueryMap, true) } } else { @@ -517,6 +517,11 @@ object DocumentLevelMonitorRunner : MonitorRunner() { return triggerResult } + /** + * 1. Bulk index all findings based on shouldCreateFinding flag + * 2. invoke publishFinding() to kickstart auto-correlations + * 3. Returns a list of pairs for finding id to doc id + */ private suspend fun createFindings( monitor: Monitor, monitorCtx: MonitorRunnerExecutionContext, @@ -559,26 +564,45 @@ object DocumentLevelMonitorRunner : MonitorRunner() { indexRequests += IndexRequest(monitor.dataSources.findingsIndex) .source(findingStr, XContentType.JSON) .id(finding.id) - .routing(finding.id) .opType(DocWriteRequest.OpType.CREATE) } } if (indexRequests.isNotEmpty()) { + bulkIndexFindings(monitor, monitorCtx, indexRequests) + } + + try { + findings.forEach { finding -> + publishFinding(monitor, monitorCtx, finding) + } + } catch (e: Exception) { + // suppress exception + logger.error("Optional finding callback failed", e) + } + return findingDocPairs + } + + private suspend fun bulkIndexFindings( + monitor: Monitor, + monitorCtx: MonitorRunnerExecutionContext, + indexRequests: List + ) { + indexRequests.chunked(monitorCtx.findingsIndexBatchSize).forEach { batch -> val bulkResponse: BulkResponse = monitorCtx.client!!.suspendUntil { - bulk(BulkRequest().add(indexRequests).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), it) + bulk(BulkRequest().add(batch), it) } if (bulkResponse.hasFailures()) { bulkResponse.items.forEach { item -> if (item.isFailed) { - logger.debug("Failed indexing the finding ${item.id} of monitor [${monitor.id}]") + logger.error("Failed indexing the finding ${item.id} of monitor [${monitor.id}]") } } } else { logger.debug("[${bulkResponse.items.size}] All findings successfully indexed.") } } - return findingDocPairs + monitorCtx.client!!.execute(RefreshAction.INSTANCE, RefreshRequest(monitor.dataSources.findingsIndex)) } private fun publishFinding( diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt index f7e011a99..b93ecdc99 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt @@ -5,6 +5,9 @@ package org.opensearch.alerting +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse @@ -12,7 +15,9 @@ import org.opensearch.alerting.model.InputRunResults import org.opensearch.alerting.model.TriggerAfterKey import org.opensearch.alerting.opensearchapi.convertToMap import org.opensearch.alerting.opensearchapi.suspendUntil +import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.util.AggregationQueryRewriter +import org.opensearch.alerting.util.CrossClusterMonitorUtils import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.addUserBackendRolesFilter import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.executeTransportAction @@ -21,6 +26,7 @@ import org.opensearch.alerting.util.getRoleFilterEnabled import org.opensearch.alerting.util.use import org.opensearch.alerting.workflow.WorkflowRunContext import org.opensearch.client.Client +import org.opensearch.cluster.routing.Preference import org.opensearch.cluster.service.ClusterService import org.opensearch.common.io.stream.BytesStreamOutput import org.opensearch.common.settings.Settings @@ -44,6 +50,8 @@ import org.opensearch.script.TemplateScript import org.opensearch.search.builder.SearchSourceBuilder import java.time.Instant +private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) + /** Service that handles the collection of input results for Monitor executions */ class InputService( val client: Client, @@ -101,7 +109,10 @@ class InputService( .newInstance(searchParams) .execute() + val indexes = CrossClusterMonitorUtils.parseIndexesForRemoteSearch(input.indices, clusterService) val searchRequest = SearchRequest() + .indices(*indexes.toTypedArray()) + .preference(Preference.PRIMARY_FIRST.type()) .indices(*resolveIndices(monitor, input.indices).toTypedArray()) XContentType.JSON.xContent().createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, searchSource).use { searchRequest.source(SearchSourceBuilder.fromXContent(it)) @@ -115,9 +126,36 @@ class InputService( results += searchResponse.convertToMap() } is ClusterMetricsInput -> { - logger.debug("ClusterMetricsInput clusterMetricType: ${input.clusterMetricType}") - val response = executeTransportAction(input, client) - results += response.toMap() + logger.debug("ClusterMetricsInput clusterMetricType: {}", input.clusterMetricType) + + val remoteMonitoringEnabled = clusterService.clusterSettings.get(AlertingSettings.REMOTE_MONITORING_ENABLED) + logger.debug("Remote monitoring enabled: {}", remoteMonitoringEnabled) + + val responseMap = mutableMapOf>() + if (remoteMonitoringEnabled && input.clusters.isNotEmpty()) { + client.threadPool().threadContext.stashContext().use { + scope.launch { + input.clusters.forEach { cluster -> + val targetClient = CrossClusterMonitorUtils.getClientForCluster(cluster, client, clusterService) + val response = executeTransportAction(input, targetClient) + // Not all supported API reference the cluster name in their response. + // Mapping each response to the cluster name before adding to results. + // Not adding this same logic for local-only monitors to avoid breaking existing monitors. + responseMap[cluster] = response.toMap() + } + } + } + val inputTimeout = clusterService.clusterSettings.get(AlertingSettings.INPUT_TIMEOUT) + val startTime = Instant.now().toEpochMilli() + while ( + (Instant.now().toEpochMilli() - startTime >= inputTimeout.millis) || + (responseMap.size < input.clusters.size) + ) { /* Wait for responses */ } + results += responseMap + } else { + val response = executeTransportAction(input, client) + results += response.toMap() + } } else -> { throw IllegalArgumentException("Unsupported input type: ${input.name()}.") diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt index 9bd923e22..109d4def9 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt @@ -12,6 +12,7 @@ import kotlinx.coroutines.SupervisorJob import org.apache.logging.log4j.LogManager import org.opensearch.ExceptionsHelper import org.opensearch.OpenSearchSecurityException +import org.opensearch.OpenSearchStatusException import org.opensearch.action.DocWriteRequest import org.opensearch.action.DocWriteResponse import org.opensearch.action.admin.indices.get.GetIndexRequest @@ -78,35 +79,51 @@ object MonitorMetadataService : @Suppress("ComplexMethod", "ReturnCount") suspend fun upsertMetadata(metadata: MonitorMetadata, updating: Boolean): MonitorMetadata { try { - val indexRequest = IndexRequest(ScheduledJob.SCHEDULED_JOBS_INDEX) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .source(metadata.toXContent(XContentFactory.jsonBuilder(), ToXContent.MapParams(mapOf("with_type" to "true")))) - .id(metadata.id) - .routing(metadata.monitorId) - .setIfSeqNo(metadata.seqNo) - .setIfPrimaryTerm(metadata.primaryTerm) - .timeout(indexTimeout) + if (clusterService.state().routingTable.hasIndex(ScheduledJob.SCHEDULED_JOBS_INDEX)) { + val indexRequest = IndexRequest(ScheduledJob.SCHEDULED_JOBS_INDEX) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .source( + metadata.toXContent( + XContentFactory.jsonBuilder(), + ToXContent.MapParams(mapOf("with_type" to "true")) + ) + ) + .id(metadata.id) + .routing(metadata.monitorId) + .setIfSeqNo(metadata.seqNo) + .setIfPrimaryTerm(metadata.primaryTerm) + .timeout(indexTimeout) - if (updating) { - indexRequest.id(metadata.id).setIfSeqNo(metadata.seqNo).setIfPrimaryTerm(metadata.primaryTerm) - } else { - indexRequest.opType(DocWriteRequest.OpType.CREATE) - } - val response: IndexResponse = client.suspendUntil { index(indexRequest, it) } - when (response.result) { - DocWriteResponse.Result.DELETED, DocWriteResponse.Result.NOOP, DocWriteResponse.Result.NOT_FOUND, null -> { - val failureReason = "The upsert metadata call failed with a ${response.result?.lowercase} result" - log.error(failureReason) - throw AlertingException(failureReason, RestStatus.INTERNAL_SERVER_ERROR, IllegalStateException(failureReason)) + if (updating) { + indexRequest.id(metadata.id).setIfSeqNo(metadata.seqNo).setIfPrimaryTerm(metadata.primaryTerm) + } else { + indexRequest.opType(DocWriteRequest.OpType.CREATE) } - DocWriteResponse.Result.CREATED, DocWriteResponse.Result.UPDATED -> { - log.debug("Successfully upserted MonitorMetadata:${metadata.id} ") + val response: IndexResponse = client.suspendUntil { index(indexRequest, it) } + when (response.result) { + DocWriteResponse.Result.DELETED, DocWriteResponse.Result.NOOP, DocWriteResponse.Result.NOT_FOUND, null -> { + val failureReason = + "The upsert metadata call failed with a ${response.result?.lowercase} result" + log.error(failureReason) + throw AlertingException( + failureReason, + RestStatus.INTERNAL_SERVER_ERROR, + IllegalStateException(failureReason) + ) + } + + DocWriteResponse.Result.CREATED, DocWriteResponse.Result.UPDATED -> { + log.debug("Successfully upserted MonitorMetadata:${metadata.id} ") + } } + return metadata.copy( + seqNo = response.seqNo, + primaryTerm = response.primaryTerm + ) + } else { + val failureReason = "Job index ${ScheduledJob.SCHEDULED_JOBS_INDEX} does not exist to update monitor metadata" + throw OpenSearchStatusException(failureReason, RestStatus.INTERNAL_SERVER_ERROR) } - return metadata.copy( - seqNo = response.seqNo, - primaryTerm = response.primaryTerm - ) } catch (e: Exception) { log.error("Failed to upsert metadata", e) throw AlertingException.wrap(e) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt index 3b17ceebe..2e72af40b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt @@ -49,5 +49,6 @@ data class MonitorRunnerExecutionContext( @Volatile var destinationContextFactory: DestinationContextFactory? = null, @Volatile var maxActionableAlertCount: Long = AlertingSettings.DEFAULT_MAX_ACTIONABLE_ALERT_COUNT, - @Volatile var indexTimeout: TimeValue? = null + @Volatile var indexTimeout: TimeValue? = null, + @Volatile var findingsIndexBatchSize: Int = AlertingSettings.DEFAULT_FINDINGS_INDEXING_BATCH_SIZE ) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt index b8719b4b0..fe8e94734 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt @@ -22,8 +22,10 @@ import org.opensearch.alerting.model.WorkflowRunResult import org.opensearch.alerting.model.destination.DestinationContextFactory import org.opensearch.alerting.opensearchapi.retry import org.opensearch.alerting.script.TriggerExecutionContext +import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_MILLIS +import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDINGS_INDEXING_BATCH_SIZE import org.opensearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT import org.opensearch.alerting.settings.AlertingSettings.Companion.MAX_ACTIONABLE_ALERT_COUNT import org.opensearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_COUNT @@ -175,6 +177,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon monitorCtx.indexTimeout = INDEX_TIMEOUT.get(monitorCtx.settings) + monitorCtx.findingsIndexBatchSize = FINDINGS_INDEXING_BATCH_SIZE.get(monitorCtx.settings) + monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE) { + monitorCtx.findingsIndexBatchSize = it + } + return this } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt index 691071517..a77121069 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt @@ -11,6 +11,7 @@ import org.opensearch.alerting.model.QueryLevelTriggerRunResult import org.opensearch.alerting.opensearchapi.InjectorContextElement import org.opensearch.alerting.opensearchapi.withClosableContext import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext +import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.util.isADMonitor import org.opensearch.alerting.workflow.WorkflowRunContext import org.opensearch.commons.alerting.model.Alert @@ -65,7 +66,21 @@ object QueryLevelMonitorRunner : MonitorRunner() { for (trigger in monitor.triggers) { val currentAlert = currentAlerts[trigger] val triggerCtx = QueryLevelTriggerExecutionContext(monitor, trigger as QueryLevelTrigger, monitorResult, currentAlert) - val triggerResult = monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx) + val triggerResult = when (monitor.monitorType) { + Monitor.MonitorType.QUERY_LEVEL_MONITOR -> + monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx) + Monitor.MonitorType.CLUSTER_METRICS_MONITOR -> { + val remoteMonitoringEnabled = + monitorCtx.clusterService!!.clusterSettings.get(AlertingSettings.REMOTE_MONITORING_ENABLED) + logger.debug("Remote monitoring enabled: {}", remoteMonitoringEnabled) + if (remoteMonitoringEnabled) + monitorCtx.triggerService!!.runClusterMetricsTrigger(monitor, trigger, triggerCtx, monitorCtx.clusterService!!) + else monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx) + } + else -> + throw IllegalArgumentException("Unsupported monitor type: ${monitor.monitorType.name}.") + } + triggerResults[trigger.id] = triggerResult if (monitorCtx.triggerService!!.isQueryLevelTriggerActionable(triggerCtx, triggerResult, workflowRunContext)) { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt index f2356eddf..21ba32475 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt @@ -9,6 +9,8 @@ import org.apache.logging.log4j.LogManager import org.opensearch.alerting.chainedAlertCondition.parsers.ChainedAlertExpressionParser import org.opensearch.alerting.model.BucketLevelTriggerRunResult import org.opensearch.alerting.model.ChainedAlertTriggerRunResult +import org.opensearch.alerting.model.ClusterMetricsTriggerRunResult +import org.opensearch.alerting.model.ClusterMetricsTriggerRunResult.ClusterTriggerResult import org.opensearch.alerting.model.DocumentLevelTriggerRunResult import org.opensearch.alerting.model.QueryLevelTriggerRunResult import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext @@ -16,8 +18,10 @@ import org.opensearch.alerting.script.ChainedAlertTriggerExecutionContext import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext import org.opensearch.alerting.script.TriggerScript import org.opensearch.alerting.triggercondition.parsers.TriggerExpressionParser +import org.opensearch.alerting.util.CrossClusterMonitorUtils import org.opensearch.alerting.util.getBucketKeysHash import org.opensearch.alerting.workflow.WorkflowRunContext +import org.opensearch.cluster.service.ClusterService import org.opensearch.commons.alerting.aggregation.bucketselectorext.BucketSelectorIndices.Fields.BUCKET_INDICES import org.opensearch.commons.alerting.aggregation.bucketselectorext.BucketSelectorIndices.Fields.PARENT_BUCKET_PATH import org.opensearch.commons.alerting.model.AggregationResultBucket @@ -79,6 +83,52 @@ class TriggerService(val scriptService: ScriptService) { } } + fun runClusterMetricsTrigger( + monitor: Monitor, + trigger: QueryLevelTrigger, + ctx: QueryLevelTriggerExecutionContext, + clusterService: ClusterService + ): ClusterMetricsTriggerRunResult { + var runResult: ClusterMetricsTriggerRunResult? + try { + val inputResults = ctx.results.getOrElse(0) { mapOf() } + var triggered = false + val clusterTriggerResults = mutableListOf() + if (CrossClusterMonitorUtils.isRemoteMonitor(monitor, clusterService)) { + inputResults.forEach { clusterResult -> + // Reducing the inputResults to only include results from 1 cluster at a time + val clusterTriggerCtx = ctx.copy(results = listOf(mapOf(clusterResult.toPair()))) + + val clusterTriggered = scriptService.compile(trigger.condition, TriggerScript.CONTEXT) + .newInstance(trigger.condition.params) + .execute(clusterTriggerCtx) + + if (clusterTriggered) { + triggered = clusterTriggered + clusterTriggerResults.add(ClusterTriggerResult(cluster = clusterResult.key, triggered = clusterTriggered)) + } + } + } else { + triggered = scriptService.compile(trigger.condition, TriggerScript.CONTEXT) + .newInstance(trigger.condition.params) + .execute(ctx) + if (triggered) clusterTriggerResults + .add(ClusterTriggerResult(cluster = clusterService.clusterName.value(), triggered = triggered)) + } + runResult = ClusterMetricsTriggerRunResult( + triggerName = trigger.name, + triggered = triggered, + error = null, + clusterTriggerResults = clusterTriggerResults + ) + } catch (e: Exception) { + logger.info("Error running script for monitor ${monitor.id}, trigger: ${trigger.id}", e) + // if the script fails we need to send an alert so set triggered = true + runResult = ClusterMetricsTriggerRunResult(trigger.name, true, e) + } + return runResult!! + } + // TODO: improve performance and support match all and match any fun runDocLevelTrigger( monitor: Monitor, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesAction.kt new file mode 100644 index 000000000..059110af4 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesAction.kt @@ -0,0 +1,15 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.action + +import org.opensearch.action.ActionType + +class GetRemoteIndexesAction private constructor() : ActionType(NAME, ::GetRemoteIndexesResponse) { + companion object { + val INSTANCE = GetRemoteIndexesAction() + const val NAME = "cluster:admin/opensearch/alerting/remote/indexes/get" + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesRequest.kt b/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesRequest.kt new file mode 100644 index 000000000..733bc3a04 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesRequest.kt @@ -0,0 +1,43 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.action + +import org.opensearch.action.ActionRequest +import org.opensearch.action.ActionRequestValidationException +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import java.io.IOException + +class GetRemoteIndexesRequest : ActionRequest { + var indexes: List = listOf() + var includeMappings: Boolean + + constructor(indexes: List, includeMappings: Boolean) : super() { + this.indexes = indexes + this.includeMappings = includeMappings + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readStringList(), + sin.readBoolean() + ) + + override fun validate(): ActionRequestValidationException? { + return null + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeStringArray(indexes.toTypedArray()) + out.writeBoolean(includeMappings) + } + + companion object { + const val INDEXES_FIELD = "indexes" + const val INCLUDE_MAPPINGS_FIELD = "include_mappings" + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesResponse.kt b/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesResponse.kt new file mode 100644 index 000000000..1572b4228 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesResponse.kt @@ -0,0 +1,135 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.action + +import org.opensearch.cluster.health.ClusterHealthStatus +import org.opensearch.cluster.metadata.MappingMetadata +import org.opensearch.core.action.ActionResponse +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.common.io.stream.Writeable +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.ToXContentObject +import org.opensearch.core.xcontent.XContentBuilder +import java.io.IOException + +class GetRemoteIndexesResponse : ActionResponse, ToXContentObject { + var clusterIndexes: List = emptyList() + + constructor(clusterIndexes: List) : super() { + this.clusterIndexes = clusterIndexes + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + clusterIndexes = sin.readList((ClusterIndexes.Companion)::readFrom) + ) + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + clusterIndexes.forEach { + it.toXContent(builder, params) + } + return builder.endObject() + } + + override fun writeTo(out: StreamOutput) { + clusterIndexes.forEach { it.writeTo(out) } + } + + data class ClusterIndexes( + val clusterName: String, + val clusterHealth: ClusterHealthStatus, + val hubCluster: Boolean, + val indexes: List = listOf(), + val latency: Long + ) : ToXContentObject, Writeable { + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + clusterName = sin.readString(), + clusterHealth = sin.readEnum(ClusterHealthStatus::class.java), + hubCluster = sin.readBoolean(), + indexes = sin.readList((ClusterIndex.Companion)::readFrom), + latency = sin.readLong() + ) + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject(clusterName) + builder.field(CLUSTER_NAME_FIELD, clusterName) + builder.field(CLUSTER_HEALTH_FIELD, clusterHealth) + builder.field(HUB_CLUSTER_FIELD, hubCluster) + builder.field(INDEX_LATENCY_FIELD, latency) + builder.startObject(INDEXES_FIELD) + indexes.forEach { + it.toXContent(builder, params) + } + return builder.endObject().endObject() + } + + override fun writeTo(out: StreamOutput) { + out.writeString(clusterName) + out.writeEnum(clusterHealth) + indexes.forEach { it.writeTo(out) } + out.writeLong(latency) + } + + companion object { + const val CLUSTER_NAME_FIELD = "cluster" + const val CLUSTER_HEALTH_FIELD = "health" + const val HUB_CLUSTER_FIELD = "hub_cluster" + const val INDEXES_FIELD = "indexes" + const val INDEX_LATENCY_FIELD = "latency" + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): ClusterIndexes { + return ClusterIndexes(sin) + } + } + + data class ClusterIndex( + val indexName: String, + val indexHealth: ClusterHealthStatus?, + val mappings: MappingMetadata? + ) : ToXContentObject, Writeable { + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + indexName = sin.readString(), + indexHealth = sin.readEnum(ClusterHealthStatus::class.java), + mappings = sin.readOptionalWriteable(::MappingMetadata) + ) + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject(indexName) + builder.field(INDEX_NAME_FIELD, indexName) + builder.field(INDEX_HEALTH_FIELD, indexHealth) + if (mappings == null) builder.startObject(MAPPINGS_FIELD).endObject() + else builder.field(MAPPINGS_FIELD, mappings.sourceAsMap()) + return builder.endObject() + } + + override fun writeTo(out: StreamOutput) { + out.writeString(indexName) + out.writeEnum(indexHealth) + if (mappings != null) out.writeMap(mappings.sourceAsMap) + } + + companion object { + const val INDEX_NAME_FIELD = "name" + const val INDEX_HEALTH_FIELD = "health" + const val MAPPINGS_FIELD = "mappings" + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): ClusterIndex { + return ClusterIndex(sin) + } + } + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/ClusterMetricsTriggerRunResult.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/ClusterMetricsTriggerRunResult.kt new file mode 100644 index 000000000..a19de0637 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/ClusterMetricsTriggerRunResult.kt @@ -0,0 +1,110 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.model + +import org.opensearch.commons.alerting.alerts.AlertError +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.common.io.stream.Writeable +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.ToXContentObject +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.script.ScriptException +import java.io.IOException +import java.time.Instant + +data class ClusterMetricsTriggerRunResult( + override var triggerName: String, + override var triggered: Boolean, + override var error: Exception?, + override var actionResults: MutableMap = mutableMapOf(), + var clusterTriggerResults: List = listOf() +) : QueryLevelTriggerRunResult( + triggerName = triggerName, + error = error, + triggered = triggered, + actionResults = actionResults +) { + + @Throws(IOException::class) + @Suppress("UNCHECKED_CAST") + constructor(sin: StreamInput) : this( + triggerName = sin.readString(), + error = sin.readException(), + triggered = sin.readBoolean(), + actionResults = sin.readMap() as MutableMap, + clusterTriggerResults = sin.readList((ClusterTriggerResult.Companion)::readFrom) + ) + + override fun alertError(): AlertError? { + if (error != null) { + return AlertError(Instant.now(), "Failed evaluating trigger:\n${error!!.userErrorMessage()}") + } + for (actionResult in actionResults.values) { + if (actionResult.error != null) { + return AlertError(Instant.now(), "Failed running action:\n${actionResult.error.userErrorMessage()}") + } + } + return null + } + + override fun internalXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + if (error is ScriptException) error = Exception((error as ScriptException).toJsonString(), error) + builder + .field(TRIGGERED_FIELD, triggered) + .field(ACTION_RESULTS_FIELD, actionResults as Map) + .startArray(CLUSTER_RESULTS_FIELD) + clusterTriggerResults.forEach { it.toXContent(builder, params) } + return builder.endArray() + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + super.writeTo(out) + out.writeBoolean(triggered) + out.writeMap(actionResults as Map) + clusterTriggerResults.forEach { it.writeTo(out) } + } + + companion object { + const val TRIGGERED_FIELD = "triggered" + const val ACTION_RESULTS_FIELD = "action_results" + const val CLUSTER_RESULTS_FIELD = "cluster_results" + } + + data class ClusterTriggerResult( + val cluster: String, + val triggered: Boolean, + ) : ToXContentObject, Writeable { + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + cluster = sin.readString(), + triggered = sin.readBoolean() + ) + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject() + .startObject(cluster) + .field(TRIGGERED_FIELD, triggered) + .endObject() + .endObject() + } + + override fun writeTo(out: StreamOutput) { + out.writeString(cluster) + out.writeBoolean(triggered) + } + + companion object { + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): ClusterTriggerResult { + return ClusterTriggerResult(sin) + } + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/QueryLevelTriggerRunResult.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/QueryLevelTriggerRunResult.kt index d123dbae4..5917c1ecf 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/QueryLevelTriggerRunResult.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/QueryLevelTriggerRunResult.kt @@ -14,11 +14,11 @@ import org.opensearch.script.ScriptException import java.io.IOException import java.time.Instant -data class QueryLevelTriggerRunResult( +open class QueryLevelTriggerRunResult( override var triggerName: String, - var triggered: Boolean, + open var triggered: Boolean, override var error: Exception?, - var actionResults: MutableMap = mutableMapOf() + open var actionResults: MutableMap = mutableMapOf() ) : TriggerRunResult(triggerName, error) { @Throws(IOException::class) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetRemoteIndexesAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetRemoteIndexesAction.kt new file mode 100644 index 000000000..591ab2c3e --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetRemoteIndexesAction.kt @@ -0,0 +1,50 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.resthandler + +import org.apache.logging.log4j.LogManager +import org.opensearch.alerting.AlertingPlugin +import org.opensearch.alerting.action.GetRemoteIndexesAction +import org.opensearch.alerting.action.GetRemoteIndexesRequest +import org.opensearch.client.node.NodeClient +import org.opensearch.core.common.Strings +import org.opensearch.rest.BaseRestHandler +import org.opensearch.rest.RestHandler +import org.opensearch.rest.RestRequest +import org.opensearch.rest.action.RestToXContentListener + +private val log = LogManager.getLogger(RestGetRemoteIndexesAction::class.java) + +class RestGetRemoteIndexesAction : BaseRestHandler() { + val ROUTE = "${AlertingPlugin.REMOTE_BASE_URI}/indexes" + + override fun getName(): String { + return "get_remote_indexes_action" + } + + override fun routes(): List { + return mutableListOf( + RestHandler.Route(RestRequest.Method.GET, ROUTE) + ) + } + + override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { + log.debug("${request.method()} $ROUTE") + val indexes = Strings.splitStringByCommaToArray(request.param(GetRemoteIndexesRequest.INDEXES_FIELD, "")) + val includeMappings = request.paramAsBoolean(GetRemoteIndexesRequest.INCLUDE_MAPPINGS_FIELD, false) + return RestChannelConsumer { + channel -> + client.execute( + GetRemoteIndexesAction.INSTANCE, + GetRemoteIndexesRequest( + indexes = indexes.toList(), + includeMappings = includeMappings + ), + RestToXContentListener(channel) + ) + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt index 1bf2dc663..63548681a 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt @@ -17,6 +17,7 @@ class AlertingSettings { companion object { const val DEFAULT_MAX_ACTIONABLE_ALERT_COUNT = 50L + const val DEFAULT_FINDINGS_INDEXING_BATCH_SIZE = 1000 val ALERTING_MAX_MONITORS = Setting.intSetting( "plugins.alerting.monitor.max_monitors", @@ -185,6 +186,20 @@ class AlertingSettings { "plugins.alerting.max_actionable_alert_count", DEFAULT_MAX_ACTIONABLE_ALERT_COUNT, -1L, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ) + + val REMOTE_MONITORING_ENABLED = Setting.boolSetting( + "plugins.alerting.remote_monitoring_enabled", + false, + Setting.Property.NodeScope, Setting.Property.Dynamic + ) + + val FINDINGS_INDEXING_BATCH_SIZE = Setting.intSetting( + "plugins.alerting.alert_findings_indexing_batch_size", + DEFAULT_FINDINGS_INDEXING_BATCH_SIZE, + 1, Setting.Property.NodeScope, Setting.Property.Dynamic ) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetRemoteIndexesAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetRemoteIndexesAction.kt new file mode 100644 index 000000000..5b35d493a --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetRemoteIndexesAction.kt @@ -0,0 +1,193 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.transport + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import kotlinx.coroutines.newSingleThreadContext +import kotlinx.coroutines.withContext +import org.apache.logging.log4j.LogManager +import org.opensearch.OpenSearchStatusException +import org.opensearch.action.admin.cluster.health.ClusterHealthRequest +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse +import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest +import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse +import org.opensearch.action.admin.indices.resolve.ResolveIndexAction +import org.opensearch.action.support.ActionFilters +import org.opensearch.action.support.HandledTransportAction +import org.opensearch.action.support.IndicesOptions +import org.opensearch.alerting.action.GetRemoteIndexesAction +import org.opensearch.alerting.action.GetRemoteIndexesRequest +import org.opensearch.alerting.action.GetRemoteIndexesResponse +import org.opensearch.alerting.action.GetRemoteIndexesResponse.ClusterIndexes +import org.opensearch.alerting.action.GetRemoteIndexesResponse.ClusterIndexes.ClusterIndex +import org.opensearch.alerting.opensearchapi.suspendUntil +import org.opensearch.alerting.settings.AlertingSettings +import org.opensearch.alerting.settings.AlertingSettings.Companion.REMOTE_MONITORING_ENABLED +import org.opensearch.alerting.util.AlertingException +import org.opensearch.alerting.util.CrossClusterMonitorUtils +import org.opensearch.client.Client +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.inject.Inject +import org.opensearch.common.settings.Settings +import org.opensearch.core.action.ActionListener +import org.opensearch.core.rest.RestStatus +import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.tasks.Task +import org.opensearch.transport.TransportService +import java.time.Duration +import java.time.Instant + +private val log = LogManager.getLogger(TransportGetRemoteIndexesAction::class.java) +private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) + +class TransportGetRemoteIndexesAction @Inject constructor( + val transportService: TransportService, + val client: Client, + actionFilters: ActionFilters, + val xContentRegistry: NamedXContentRegistry, + val clusterService: ClusterService, + settings: Settings, +) : HandledTransportAction( + GetRemoteIndexesAction.NAME, + transportService, + actionFilters, + ::GetRemoteIndexesRequest +), + SecureTransportAction { + + @Volatile override var filterByEnabled = AlertingSettings.FILTER_BY_BACKEND_ROLES.get(settings) + + @Volatile private var remoteMonitoringEnabled = REMOTE_MONITORING_ENABLED.get(settings) + + init { + clusterService.clusterSettings.addSettingsUpdateConsumer(REMOTE_MONITORING_ENABLED) { remoteMonitoringEnabled = it } + listenFilterBySettingChange(clusterService) + } + + override fun doExecute( + task: Task, + request: GetRemoteIndexesRequest, + actionListener: ActionListener + ) { + log.debug("Remote monitoring enabled: {}", remoteMonitoringEnabled) + if (!remoteMonitoringEnabled) { + actionListener.onFailure( + AlertingException.wrap( + OpenSearchStatusException("Remote monitoring is not enabled.", RestStatus.FORBIDDEN) + ) + ) + return + } + + val user = readUserFromThreadContext(client) + if (!validateUserBackendRoles(user, actionListener)) return + + client.threadPool().threadContext.stashContext().use { + scope.launch { + val singleThreadContext = newSingleThreadContext("GetRemoteIndexesActionThread") + withContext(singleThreadContext) { + it.restore() + val clusterIndexesList = mutableListOf() + + var resolveIndexResponse: ResolveIndexAction.Response? = null + try { + resolveIndexResponse = + getRemoteClusters(CrossClusterMonitorUtils.parseIndexesForRemoteSearch(request.indexes, clusterService)) + } catch (e: Exception) { + log.error("Failed to retrieve indexes for request $request", e) + actionListener.onFailure(AlertingException.wrap(e)) + } + + val resolvedIndexes: MutableList = mutableListOf() + if (resolveIndexResponse != null) { + resolveIndexResponse.indices.forEach { resolvedIndexes.add(it.name) } + resolveIndexResponse.aliases.forEach { resolvedIndexes.add(it.name) } + } + + val clusterIndexesMap = CrossClusterMonitorUtils.separateClusterIndexes(resolvedIndexes, clusterService) + + clusterIndexesMap.forEach { (clusterName, indexes) -> + val targetClient = CrossClusterMonitorUtils.getClientForCluster(clusterName, client, clusterService) + + val startTime = Instant.now() + var clusterHealthResponse: ClusterHealthResponse? = null + try { + clusterHealthResponse = getHealthStatuses(targetClient, indexes) + } catch (e: Exception) { + log.error("Failed to retrieve health statuses for request $request", e) + actionListener.onFailure(AlertingException.wrap(e)) + } + val endTime = Instant.now() + val latency = Duration.between(startTime, endTime).toMillis() + + var mappingsResponse: GetMappingsResponse? = null + if (request.includeMappings) { + try { + mappingsResponse = getIndexMappings(targetClient, indexes) + } catch (e: Exception) { + log.error("Failed to retrieve mappings for request $request", e) + actionListener.onFailure(AlertingException.wrap(e)) + } + } + + val clusterIndexList = mutableListOf() + if (clusterHealthResponse != null) { + indexes.forEach { + clusterIndexList.add( + ClusterIndex( + indexName = it, + indexHealth = clusterHealthResponse.indices[it]?.status, + mappings = mappingsResponse?.mappings?.get(it) + ) + ) + } + } + + clusterIndexesList.add( + ClusterIndexes( + clusterName = clusterName, + clusterHealth = clusterHealthResponse!!.status, + hubCluster = clusterName == clusterService.clusterName.value(), + indexes = clusterIndexList, + latency = latency + ) + ) + } + actionListener.onResponse(GetRemoteIndexesResponse(clusterIndexes = clusterIndexesList)) + } + } + } + } + + private suspend fun getRemoteClusters(parsedIndexes: List): ResolveIndexAction.Response { + val resolveRequest = ResolveIndexAction.Request( + parsedIndexes.toTypedArray(), + ResolveIndexAction.Request.DEFAULT_INDICES_OPTIONS + ) + + return client.suspendUntil { + admin().indices().resolveIndex(resolveRequest, it) + } + } + private suspend fun getHealthStatuses(targetClient: Client, parsedIndexesNames: List): ClusterHealthResponse { + val clusterHealthRequest = ClusterHealthRequest() + .indices(*parsedIndexesNames.toTypedArray()) + .indicesOptions(IndicesOptions.lenientExpandHidden()) + + return targetClient.suspendUntil { + admin().cluster().health(clusterHealthRequest, it) + } + } + + private suspend fun getIndexMappings(targetClient: Client, parsedIndexNames: List): GetMappingsResponse { + val getMappingsRequest = GetMappingsRequest().indices(*parsedIndexNames.toTypedArray()) + return targetClient.suspendUntil { + admin().indices().getMappings(getMappingsRequest, it) + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/CrossClusterMonitorUtils.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/CrossClusterMonitorUtils.kt new file mode 100644 index 000000000..6ec14ffa2 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/CrossClusterMonitorUtils.kt @@ -0,0 +1,231 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.util + +import org.opensearch.action.search.SearchRequest +import org.opensearch.client.Client +import org.opensearch.client.node.NodeClient +import org.opensearch.cluster.service.ClusterService +import org.opensearch.commons.alerting.model.ClusterMetricsInput +import org.opensearch.commons.alerting.model.DocLevelMonitorInput +import org.opensearch.commons.alerting.model.Monitor +import org.opensearch.commons.alerting.model.SearchInput + +class CrossClusterMonitorUtils { + companion object { + + /** + * Uses the monitor inputs to determine whether the monitor makes calls to remote clusters. + * @param monitor The monitor to evaluate. + * @param localClusterName The name of the local cluster. + * @return TRUE if the monitor makes calls to remote clusters; otherwise returns FALSE. + */ + @JvmStatic + fun isRemoteMonitor(monitor: Monitor, localClusterName: String): Boolean { + var isRemoteMonitor = false + monitor.inputs.forEach inputCheck@{ + when (it) { + is ClusterMetricsInput -> { + it.clusters.forEach { clusterName -> + if (clusterName != localClusterName) { + isRemoteMonitor = true + return@inputCheck + } + } + } + is SearchInput -> { + // Remote indexes follow the pattern ":". + // Index entries without a CLUSTER_NAME indicate they're store on the local cluster. + it.indices.forEach { index -> + val clusterName = parseClusterName(index) + if (clusterName != localClusterName) { + isRemoteMonitor = true + return@inputCheck + } + } + } + is DocLevelMonitorInput -> { + // TODO: When document level monitors are supported, this check will be similar to SearchInput. + throw IllegalArgumentException("Per document monitors do not currently support cross-cluster search.") + } + else -> { + throw IllegalArgumentException("Unsupported input type: ${it.name()}.") + } + } + } + return isRemoteMonitor + } + + /** + * Uses the monitor inputs to determine whether the monitor makes calls to remote clusters. + * @param monitor The monitor to evaluate. + * @param clusterService Used to retrieve the name of the local cluster. + * @return TRUE if the monitor makes calls to remote clusters; otherwise returns FALSE. + */ + @JvmStatic + fun isRemoteMonitor(monitor: Monitor, clusterService: ClusterService): Boolean { + return isRemoteMonitor(monitor = monitor, localClusterName = clusterService.clusterName.value()) + } + + /** + * Parses the list of indexes into a map of CLUSTER_NAME to List. + * @param indexes A list of index names in ":" format. + * @param localClusterName The name of the local cluster. + * @return A map of CLUSTER_NAME to List + */ + @JvmStatic + fun separateClusterIndexes(indexes: List, localClusterName: String): HashMap> { + val output = hashMapOf>() + indexes.forEach { index -> + var clusterName = parseClusterName(index) + val indexName = parseIndexName(index) + + // If the index entry does not have a CLUSTER_NAME, it indicates the index is on the local cluster. + if (clusterName.isEmpty()) clusterName = localClusterName + + output.getOrPut(clusterName) { mutableListOf() }.add(indexName) + } + return output + } + + /** + * Parses the list of indexes into a map of CLUSTER_NAME to List. + * @param indexes A list of index names in ":" format. + * Local indexes can also be in "" format. + * @param clusterService Used to retrieve the name of the local cluster. + * @return A map of CLUSTER_NAME to List + */ + @JvmStatic + fun separateClusterIndexes(indexes: List, clusterService: ClusterService): HashMap> { + return separateClusterIndexes(indexes = indexes, localClusterName = clusterService.clusterName.value()) + } + + /** + * The [NodeClient] used by the plugin cannot execute searches against local indexes + * using format ":". That format only supports querying remote indexes. + * This function formats a list of indexes to be supplied directly to a [SearchRequest]. + * @param indexes A list of index names in ":" format. + * @param localClusterName The name of the local cluster. + * @return A list of indexes with any remote indexes in ":" format, + * and any local indexes in "" format. + */ + @JvmStatic + fun parseIndexesForRemoteSearch(indexes: List, localClusterName: String): List { + return indexes.map { + var index = it + val clusterName = parseClusterName(it) + if (clusterName.isNotEmpty() && clusterName == localClusterName) { + index = parseIndexName(it) + } + index + } + } + + /** + * The [NodeClient] used by the plugin cannot execute searches against local indexes + * using format ":". That format only supports querying remote indexes. + * This function formats a list of indexes to be supplied directly to a [SearchRequest]. + * @param indexes A list of index names in ":" format. + * @param clusterService Used to retrieve the name of the local cluster. + * @return A list of indexes with any remote indexes in ":" format, + * and any local indexes in "" format. + */ + @JvmStatic + fun parseIndexesForRemoteSearch(indexes: List, clusterService: ClusterService): List { + return parseIndexesForRemoteSearch(indexes = indexes, localClusterName = clusterService.clusterName.value()) + } + + /** + * Uses the clusterName to determine whether the target client is the local or a remote client, + * and returns the appropriate client. + * @param clusterName The name of the cluster to evaluate. + * @param client The local [NodeClient]. + * @param localClusterName The name of the local cluster. + * @return The local [NodeClient] for the local cluster, or a remote client for a remote cluster. + */ + @JvmStatic + fun getClientForCluster(clusterName: String, client: Client, localClusterName: String): Client { + return if (clusterName == localClusterName) client else client.getRemoteClusterClient(clusterName) + } + + /** + * Uses the clusterName to determine whether the target client is the local or a remote client, + * and returns the appropriate client. + * @param clusterName The name of the cluster to evaluate. + * @param client The local [NodeClient]. + * @param clusterService Used to retrieve the name of the local cluster. + * @return The local [NodeClient] for the local cluster, or a remote client for a remote cluster. + */ + @JvmStatic + fun getClientForCluster(clusterName: String, client: Client, clusterService: ClusterService): Client { + return getClientForCluster(clusterName = clusterName, client = client, localClusterName = clusterService.clusterName.value()) + } + + /** + * Uses the index name to determine whether the target client is the local or a remote client, + * and returns the appropriate client. + * @param index The name of the index to evaluate. + * Can be in either ":" or "" format. + * @param client The local [NodeClient]. + * @param localClusterName The name of the local cluster. + * @return The local [NodeClient] for the local cluster, or a remote client for a remote cluster. + */ + @JvmStatic + fun getClientForIndex(index: String, client: Client, localClusterName: String): Client { + val clusterName = parseClusterName(index) + return if (clusterName.isNotEmpty() && clusterName != localClusterName) + client.getRemoteClusterClient(clusterName) else client + } + + /** + * Uses the index name to determine whether the target client is the local or a remote client, + * and returns the appropriate client. + * @param index The name of the index to evaluate. + * Can be in either ":" or "" format. + * @param client The local [NodeClient]. + * @param clusterService Used to retrieve the name of the local cluster. + * @return The local [NodeClient] for the local cluster, or a remote client for a remote cluster. + */ + @JvmStatic + fun getClientForIndex(index: String, client: Client, clusterService: ClusterService): Client { + return getClientForIndex(index = index, client = client, localClusterName = clusterService.clusterName.value()) + } + + /** + * @param index The name of the index to evaluate. + * Can be in either ":" or "" format. + * @return The cluster name if present; else an empty string. + */ + @JvmStatic + fun parseClusterName(index: String): String { + return if (index.contains(":")) index.split(":").getOrElse(0) { "" } + else "" + } + + /** + * @param index The name of the index to evaluate. + * Can be in either ":" or "" format. + * @return The index name. + */ + @JvmStatic + fun parseIndexName(index: String): String { + return if (index.contains(":")) index.split(":").getOrElse(1) { index } + else index + } + + /** + * If clusterName is provided, combines the inputs into ":" format. + * @param clusterName + * @param indexName + * @return The formatted string. + */ + @JvmStatic + fun formatClusterAndIndexName(clusterName: String, indexName: String): String { + return if (clusterName.isNotEmpty()) "$clusterName:$indexName" + else indexName + } + } +} diff --git a/alerting/src/main/resources/org/opensearch/alerting/alerts/alert_mapping.json b/alerting/src/main/resources/org/opensearch/alerting/alerts/alert_mapping.json index 53fb5b0a2..76e5104cc 100644 --- a/alerting/src/main/resources/org/opensearch/alerting/alerts/alert_mapping.json +++ b/alerting/src/main/resources/org/opensearch/alerting/alerts/alert_mapping.json @@ -169,6 +169,14 @@ "type": "text" } } + }, + "clusters": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } } } } \ No newline at end of file diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index 943ad11a6..479e29dca 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -393,6 +393,54 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { assertEquals("Didn't find findings for docs 1 and 5", 2, foundFindings.size) } + fun `test execute monitor for bulk index findings`() { + val testIndexPrefix = "test-index-${randomAlphaOfLength(10).lowercase(Locale.ROOT)}" + val testQueryName = "wildcard-test-query" + val testIndex = createTestIndex("${testIndexPrefix}1") + val testIndex2 = createTestIndex("${testIndexPrefix}2") + + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = testQueryName, fields = listOf()) + val docLevelInput = DocLevelMonitorInput("description", listOf("$testIndexPrefix*"), listOf(docQuery)) + + val trigger = randomDocumentLevelTrigger(condition = Script("query[name=$testQueryName]")) + val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger))) + assertNotNull(monitor.id) + + for (i in 0 until 9) { + indexDoc(testIndex, i.toString(), testDoc) + } + indexDoc(testIndex2, "3", testDoc) + adminClient().updateSettings("plugins.alerting.alert_findings_indexing_batch_size", 2) + + val response = executeMonitor(monitor.id) + + val output = entityAsMap(response) + + assertEquals(monitor.name, output["monitor_name"]) + @Suppress("UNCHECKED_CAST") + val searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + val matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Correct search result", 10, matchingDocsToQuery.size) + assertTrue("Correct search result", matchingDocsToQuery.containsAll(listOf("1|$testIndex", "2|$testIndex", "3|$testIndex2"))) + + val alerts = searchAlertsWithFilter(monitor) + assertEquals("Alert saved for test monitor", 10, alerts.size) + + val findings = searchFindings(monitor) + assertEquals("Findings saved for test monitor", 10, findings.size) + val foundFindings = + findings.filter { it.relatedDocIds.contains("1") || it.relatedDocIds.contains("2") || it.relatedDocIds.contains("3") } + assertEquals("Found findings for all docs", 4, foundFindings.size) + } + fun `test execute monitor with wildcard index that generates alerts and findings for NOT EQUALS query operator`() { val testIndexPrefix = "test-index-${randomAlphaOfLength(10).lowercase(Locale.ROOT)}" val testQueryName = "wildcard-test-query" diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/model/WriteableTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/model/WriteableTests.kt index 6851c471d..6ef15f8d8 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/model/WriteableTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/model/WriteableTests.kt @@ -40,7 +40,10 @@ class WriteableTests : OpenSearchTestCase() { runResult.writeTo(out) val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) val newRunResult = QueryLevelTriggerRunResult(sin) - assertEquals("Round tripping ActionRunResult doesn't work", runResult, newRunResult) + assertEquals(runResult.triggerName, newRunResult.triggerName) + assertEquals(runResult.triggered, newRunResult.triggered) + assertEquals(runResult.error, newRunResult.error) + assertEquals(runResult.actionResults, newRunResult.actionResults) } fun `test bucket-level triggerrunresult as stream`() { diff --git a/release-notes/opensearch-alerting.release-notes-2.12.0.0.md b/release-notes/opensearch-alerting.release-notes-2.12.0.0.md new file mode 100644 index 000000000..1ecdeadd7 --- /dev/null +++ b/release-notes/opensearch-alerting.release-notes-2.12.0.0.md @@ -0,0 +1,27 @@ +## Version 2.12.0.0 2024-02-06 +Compatible with OpenSearch 2.12.0 + +### Maintenance +* Increment version to 2.12.0-SNAPSHOT. ([#1239](https://github.com/opensearch-project/alerting/pull/1239)) +* Removed default admin credentials for alerting ([#1399](https://github.com/opensearch-project/alerting/pull/1399)) +* ipaddress lib upgrade as part of cve fix ([#1397](https://github.com/opensearch-project/alerting/pull/1397)) + +### Bug Fixes +* Don't attempt to parse workflow if it doesn't exist ([#1346](https://github.com/opensearch-project/alerting/pull/1346)) +* Set docData to empty string if actual is null ([#1325](https://github.com/opensearch-project/alerting/pull/1325)) + +### Enhancements +* Optimize doc-level monitor execution workflow for datastreams ([#1302](https://github.com/opensearch-project/alerting/pull/1302)) +* Inject namedWriteableRegistry during ser/deser of SearchMonitorAction ([#1382](https://github.com/opensearch-project/alerting/pull/1382)) +* Bulk index findings and sequentially invoke auto-correlations ([#1355](https://github.com/opensearch-project/alerting/pull/1355)) +* Implemented cross-cluster monitor support ([#1404](https://github.com/opensearch-project/alerting/pull/1404)) + +### Refactoring +* Reference get monitor and search monitor action / request / responses from common-utils ([#1315](https://github.com/opensearch-project/alerting/pull/1315)) + +### Infrastructure +* Fix workflow security tests. ([#1310](https://github.com/opensearch-project/alerting/pull/1310)) +* Upgrade to Gradle 8.5 ([#1369](https://github.com/opensearch-project/alerting/pull/1369)) + +### Documentation +* Added 2.12 release notes ([#1408](https://github.com/opensearch-project/alerting/pull/1408)) \ No newline at end of file diff --git a/scripts/build.sh b/scripts/build.sh index 5a173adfb..9f0afc3da 100755 --- a/scripts/build.sh +++ b/scripts/build.sh @@ -74,6 +74,7 @@ distributions="$(dirname "${zipPath}")" echo "COPY ${distributions}/*.zip" cp ${distributions}/*.zip ./$OUTPUT/plugins +./gradlew publishToMavenLocal -Dopensearch.version=$VERSION -Dbuild.snapshot=$SNAPSHOT -Dbuild.version_qualifier=$QUALIFIER ./gradlew publishPluginZipPublicationToZipStagingRepository -Dopensearch.version=$VERSION -Dbuild.snapshot=$SNAPSHOT -Dbuild.version_qualifier=$QUALIFIER mkdir -p $OUTPUT/maven/org/opensearch cp -r ./build/local-staging-repo/org/opensearch/. $OUTPUT/maven/org/opensearch