Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…into percolate_per_shard
  • Loading branch information
eirsep committed Feb 17, 2024
2 parents ad57b56 + 5b3e1a2 commit 924aed6
Show file tree
Hide file tree
Showing 28 changed files with 1,111 additions and 54 deletions.
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -1 +1 @@
* @lezzago @AWSHurneyt @sbcd90 @eirsep @getsaurabh02 @praveensameneni @qreshi @bowenlan-amzn @rishabhmaurya
* @lezzago @AWSHurneyt @sbcd90 @eirsep @getsaurabh02 @praveensameneni @qreshi @bowenlan-amzn @rishabhmaurya @engechas
8 changes: 4 additions & 4 deletions .github/workflows/security-test-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,20 +73,20 @@ jobs:
if: env.imagePresent == 'true'
run: |
cd ..
docker run -p 9200:9200 -d -p 9600:9600 -e "discovery.type=single-node" opensearch-alerting:test
docker run -p 9200:9200 -d -p 9600:9600 -e "OPENSEARCH_INITIAL_ADMIN_PASSWORD=myStrongPassword123!" -e "discovery.type=single-node" opensearch-alerting:test
sleep 120
- name: Run Alerting Test for security enabled test cases
if: env.imagePresent == 'true'
run: |
cluster_running=`curl -XGET https://localhost:9200/_cat/plugins -u admin:admin --insecure`
cluster_running=`curl -XGET https://localhost:9200/_cat/plugins -u admin:myStrongPassword123! --insecure`
echo $cluster_running
security=`curl -XGET https://localhost:9200/_cat/plugins -u admin:admin --insecure |grep opensearch-security|wc -l`
security=`curl -XGET https://localhost:9200/_cat/plugins -u admin:myStrongPassword123! --insecure |grep opensearch-security|wc -l`
echo $security
if [ $security -gt 0 ]
then
echo "Security plugin is available"
./gradlew :alerting:integTest -Dtests.rest.cluster=localhost:9200 -Dtests.cluster=localhost:9200 -Dtests.clustername=docker-cluster -Dsecurity=true -Dhttps=true -Duser=admin -Dpassword=admin
./gradlew :alerting:integTest -Dtests.rest.cluster=localhost:9200 -Dtests.cluster=localhost:9200 -Dtests.clustername=docker-cluster -Dsecurity=true -Dhttps=true -Duser=admin -Dpassword=myStrongPassword123!
else
echo "Security plugin is NOT available skipping this run as tests without security have already been run"
fi
4 changes: 2 additions & 2 deletions DEVELOPER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ When launching a cluster using one of the above commands, logs are placed in `al

1. Setup a local opensearch cluster with security plugin.

- `./gradlew :alerting:integTest -Dtests.rest.cluster=localhost:9200 -Dtests.cluster=localhost:9200 -Dtests.clustername=opensearch -Dhttps=true -Dsecurity=true -Duser=admin -Dpassword=admin`
- `./gradlew :alerting:integTest -Dtests.rest.cluster=localhost:9200 -Dtests.cluster=localhost:9200 -Dtests.clustername=opensearch -Dhttps=true -Dsecurity=true -Duser=admin -Dpassword=<admin-password>`

- `./gradlew :alerting:integTest -Dtests.rest.cluster=localhost:9200 -Dtests.cluster=localhost:9200 -Dtests.clustername=opensearch -Dhttps=true -Dsecurity=true -Duser=admin -Dpassword=admin --tests "org.opensearch.alerting.MonitorRunnerIT.test execute monitor returns search result"`
- `./gradlew :alerting:integTest -Dtests.rest.cluster=localhost:9200 -Dtests.cluster=localhost:9200 -Dtests.clustername=opensearch -Dhttps=true -Dsecurity=true -Duser=admin -Dpassword=<admin-password> --tests "org.opensearch.alerting.MonitorRunnerIT.test execute monitor returns search result"`


#### Building from the IDE
Expand Down
3 changes: 2 additions & 1 deletion MAINTAINERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ This document contains a list of maintainers in this repo. See [opensearch-proje
## Current Maintainers

| Maintainer | GitHub ID | Affiliation |
|----------------------| ------------------------------------------------- | ----------- |
|----------------------| ------------------------------------------------- |-------------|
| Ashish Agrawal | [lezzago](https://github.com/lezzago) | Amazon |
| Mohammad Qureshi | [qreshi](https://github.com/qreshi) | Amazon |
| Bowen Lan | [bowenlan-amzn](https://github.com/bowenlan-amzn) | Amazon |
Expand All @@ -15,6 +15,7 @@ This document contains a list of maintainers in this repo. See [opensearch-proje
| Surya Sashank Nistala | [eirsep](https://github.com/eirsep) | Amazon |
| Thomas Hurney | [AWSHurneyt](https://github.com/AWSHurneyt) | Amazon |
| Praveen Sameneni | [praveensameneni](https://github.com/praveensameneni) | Amazon |
| Chase Engelbrecht | [engechas](https://github.com/engechas) | Amazon |

## Emeritus

Expand Down
2 changes: 1 addition & 1 deletion alerting/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ dependencies {
implementation "org.jetbrains:annotations:13.0"

api project(":alerting-core")
implementation "com.github.seancfoley:ipaddress:5.3.3"
implementation "com.github.seancfoley:ipaddress:5.4.1"

testImplementation "org.antlr:antlr4-runtime:${versions.antlr4}"
testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}"
Expand Down
21 changes: 19 additions & 2 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.model.ActionRunResult
import org.opensearch.alerting.model.ChainedAlertTriggerRunResult
import org.opensearch.alerting.model.ClusterMetricsTriggerRunResult
import org.opensearch.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.alerting.opensearchapi.firstFailureOrNull
import org.opensearch.alerting.opensearchapi.retry
Expand Down Expand Up @@ -190,6 +191,19 @@ class AlertService(
)
}

// Including a list of triggered clusters for cluster metrics monitors
var triggeredClusters: MutableList<String>? = null
if (result is ClusterMetricsTriggerRunResult)
result.clusterTriggerResults.forEach {
if (it.triggered) {
// Add an empty list if one isn't already present
if (triggeredClusters.isNullOrEmpty()) triggeredClusters = mutableListOf()

// Add the cluster to the list of triggered clusters
triggeredClusters!!.add(it.cluster)
}
}

// Merge the alert's error message to the current alert's history
val updatedHistory = currentAlert?.errorHistory.update(alertError)
return if (alertError == null && !result.triggered) {
Expand All @@ -199,7 +213,8 @@ class AlertService(
errorMessage = null,
errorHistory = updatedHistory,
actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion
schemaVersion = IndexUtils.alertIndexSchemaVersion,
clusters = triggeredClusters
)
} else if (alertError == null && currentAlert?.isAcknowledged() == true) {
null
Expand All @@ -212,6 +227,7 @@ class AlertService(
errorHistory = updatedHistory,
actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion,
clusters = triggeredClusters
)
} else {
val alertState = if (workflorwRunContext?.auditDelegateMonitorAlerts == true) {
Expand All @@ -223,7 +239,8 @@ class AlertService(
lastNotificationTime = currentTime, state = alertState, errorMessage = alertError?.message,
errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion, executionId = executionId,
workflowId = workflorwRunContext?.workflowId ?: ""
workflowId = workflorwRunContext?.workflowId ?: "",
clusters = triggeredClusters
)
}
}
Expand Down
14 changes: 11 additions & 3 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.opensearch.alerting.action.ExecuteWorkflowAction
import org.opensearch.alerting.action.GetDestinationsAction
import org.opensearch.alerting.action.GetEmailAccountAction
import org.opensearch.alerting.action.GetEmailGroupAction
import org.opensearch.alerting.action.GetRemoteIndexesAction
import org.opensearch.alerting.action.SearchEmailAccountAction
import org.opensearch.alerting.action.SearchEmailGroupAction
import org.opensearch.alerting.alerts.AlertIndices
Expand All @@ -34,6 +35,7 @@ import org.opensearch.alerting.resthandler.RestGetEmailAccountAction
import org.opensearch.alerting.resthandler.RestGetEmailGroupAction
import org.opensearch.alerting.resthandler.RestGetFindingsAction
import org.opensearch.alerting.resthandler.RestGetMonitorAction
import org.opensearch.alerting.resthandler.RestGetRemoteIndexesAction
import org.opensearch.alerting.resthandler.RestGetWorkflowAction
import org.opensearch.alerting.resthandler.RestGetWorkflowAlertsAction
import org.opensearch.alerting.resthandler.RestIndexMonitorAction
Expand All @@ -59,6 +61,7 @@ import org.opensearch.alerting.transport.TransportGetEmailAccountAction
import org.opensearch.alerting.transport.TransportGetEmailGroupAction
import org.opensearch.alerting.transport.TransportGetFindingsSearchAction
import org.opensearch.alerting.transport.TransportGetMonitorAction
import org.opensearch.alerting.transport.TransportGetRemoteIndexesAction
import org.opensearch.alerting.transport.TransportGetWorkflowAction
import org.opensearch.alerting.transport.TransportGetWorkflowAlertsAction
import org.opensearch.alerting.transport.TransportIndexMonitorAction
Expand Down Expand Up @@ -133,6 +136,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
@JvmField val UI_METADATA_EXCLUDE = arrayOf("monitor.${Monitor.UI_METADATA_FIELD}")
@JvmField val MONITOR_BASE_URI = "/_plugins/_alerting/monitors"
@JvmField val WORKFLOW_BASE_URI = "/_plugins/_alerting/workflows"
@JvmField val REMOTE_BASE_URI = "/_plugins/_alerting/remote"
@JvmField val DESTINATION_BASE_URI = "/_plugins/_alerting/destinations"
@JvmField val LEGACY_OPENDISTRO_MONITOR_BASE_URI = "/_opendistro/_alerting/monitors"
@JvmField val LEGACY_OPENDISTRO_DESTINATION_BASE_URI = "/_opendistro/_alerting/destinations"
Expand Down Expand Up @@ -184,7 +188,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
RestGetWorkflowAlertsAction(),
RestGetFindingsAction(),
RestGetWorkflowAction(),
RestDeleteWorkflowAction()
RestDeleteWorkflowAction(),
RestGetRemoteIndexesAction(),
)
}

Expand All @@ -211,7 +216,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
ActionPlugin.ActionHandler(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, TransportIndexWorkflowAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.GET_WORKFLOW_ACTION_TYPE, TransportGetWorkflowAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, TransportDeleteWorkflowAction::class.java),
ActionPlugin.ActionHandler(ExecuteWorkflowAction.INSTANCE, TransportExecuteWorkflowAction::class.java)
ActionPlugin.ActionHandler(ExecuteWorkflowAction.INSTANCE, TransportExecuteWorkflowAction::class.java),
ActionPlugin.ActionHandler(GetRemoteIndexesAction.INSTANCE, TransportGetRemoteIndexesAction::class.java),
)
}

Expand Down Expand Up @@ -351,7 +357,9 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
AlertingSettings.FINDING_HISTORY_MAX_DOCS,
AlertingSettings.FINDING_HISTORY_INDEX_MAX_AGE,
AlertingSettings.FINDING_HISTORY_ROLLOVER_PERIOD,
AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD
AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD,
AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE,
AlertingSettings.REMOTE_MONITORING_ENABLED
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.DocWriteRequest
import org.opensearch.action.admin.indices.refresh.RefreshAction
import org.opensearch.action.admin.indices.refresh.RefreshRequest
import org.opensearch.action.bulk.BulkRequest
import org.opensearch.action.bulk.BulkResponse
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.search.SearchAction
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.alerting.model.IndexExecutionContext
import org.opensearch.alerting.model.InputRunResults
Expand Down Expand Up @@ -322,7 +323,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
// If there are no triggers defined, we still want to generate findings
if (monitor.triggers.isEmpty()) {
if (dryrun == false && monitor.id != Monitor.NO_ID) {
logger.error("PERF_DEBUG: Creating ${docsToQueries.size} findings for monitor ${monitor.id}")
createFindings(monitor, monitorCtx, docsToQueries, idQueryMap, true)
}
} else {
Expand Down Expand Up @@ -517,6 +517,11 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
return triggerResult
}

/**
* 1. Bulk index all findings based on shouldCreateFinding flag
* 2. invoke publishFinding() to kickstart auto-correlations
* 3. Returns a list of pairs for finding id to doc id
*/
private suspend fun createFindings(
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
Expand Down Expand Up @@ -559,26 +564,45 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
indexRequests += IndexRequest(monitor.dataSources.findingsIndex)
.source(findingStr, XContentType.JSON)
.id(finding.id)
.routing(finding.id)
.opType(DocWriteRequest.OpType.CREATE)
}
}

if (indexRequests.isNotEmpty()) {
bulkIndexFindings(monitor, monitorCtx, indexRequests)
}

try {
findings.forEach { finding ->
publishFinding(monitor, monitorCtx, finding)
}
} catch (e: Exception) {
// suppress exception
logger.error("Optional finding callback failed", e)
}
return findingDocPairs
}

private suspend fun bulkIndexFindings(
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
indexRequests: List<IndexRequest>
) {
indexRequests.chunked(monitorCtx.findingsIndexBatchSize).forEach { batch ->
val bulkResponse: BulkResponse = monitorCtx.client!!.suspendUntil {
bulk(BulkRequest().add(indexRequests).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), it)
bulk(BulkRequest().add(batch), it)
}
if (bulkResponse.hasFailures()) {
bulkResponse.items.forEach { item ->
if (item.isFailed) {
logger.debug("Failed indexing the finding ${item.id} of monitor [${monitor.id}]")
logger.error("Failed indexing the finding ${item.id} of monitor [${monitor.id}]")
}
}
} else {
logger.debug("[${bulkResponse.items.size}] All findings successfully indexed.")
}
}
return findingDocPairs
monitorCtx.client!!.execute(RefreshAction.INSTANCE, RefreshRequest(monitor.dataSources.findingsIndex))
}

private fun publishFinding(
Expand Down
Loading

0 comments on commit 924aed6

Please sign in to comment.