Skip to content

Commit

Permalink
integrate security-analytics & alerting for correlation engine (#878) (
Browse files Browse the repository at this point in the history
…#880)

Signed-off-by: Subhobrata Dey <[email protected]>
  • Loading branch information
opensearch-trigger-bot[bot] authored Apr 19, 2023
1 parent 493139e commit a7b03d3
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.alerting
import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.ActionListener
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.index.IndexResponse
import org.opensearch.action.search.SearchAction
Expand All @@ -26,12 +27,16 @@ import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.client.Client
import org.opensearch.client.node.NodeClient
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.routing.ShardRouting
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.bytes.BytesReference
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.AlertingPluginInterface
import org.opensearch.commons.alerting.action.PublishFindingsRequest
import org.opensearch.commons.alerting.action.SubscribeFindingsResponse
import org.opensearch.commons.alerting.model.ActionExecutionResult
import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
Expand Down Expand Up @@ -342,6 +347,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
val finding = Finding(
id = UUID.randomUUID().toString(),
relatedDocIds = listOf(docIndex[0]),
correlatedDocIds = listOf(docIndex[0]),
monitorId = monitor.id,
monitorName = monitor.name,
index = docIndex[1],
Expand All @@ -363,9 +369,33 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitorCtx.client!!.index(indexRequest, it)
}
}

try {
publishFinding(monitor, monitorCtx, finding)
} catch (e: Exception) {
// suppress exception
logger.error("Optional finding callback failed", e)
}
return finding.id
}

private fun publishFinding(
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
finding: Finding
) {
val publishFindingsRequest = PublishFindingsRequest(monitor.id, finding)
AlertingPluginInterface.publishFinding(
monitorCtx.client!! as NodeClient,
publishFindingsRequest,
object : ActionListener<SubscribeFindingsResponse> {
override fun onResponse(response: SubscribeFindingsResponse) {}

override fun onFailure(e: Exception) {}
}
)
}

private suspend fun updateLastRunContext(
lastRunContext: Map<String, Any>,
monitorCtx: MonitorRunnerExecutionContext,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"dynamic": "strict",
"_meta" : {
"schema_version": 1
"schema_version": 2
},
"properties": {
"schema_version": {
Expand Down Expand Up @@ -51,6 +51,15 @@
},
"timestamp": {
"type": "long"
},
"correlated_doc_ids": {
"type" : "text",
"analyzer": "whitespace",
"fields" : {
"keyword" : {
"type" : "keyword"
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class AlertIndicesIT : AlertingRestTestCase() {

putFindingMappings(
AlertIndices.findingMapping().trimStart('{').trimEnd('}')
.replace("\"schema_version\": 1", "\"schema_version\": 0")
.replace("\"schema_version\": 2", "\"schema_version\": 0")
)
assertIndexExists(AlertIndices.FINDING_HISTORY_WRITE_INDEX)
verifyIndexSchemaVersion(AlertIndices.FINDING_HISTORY_WRITE_INDEX, 0)
Expand All @@ -89,7 +89,7 @@ class AlertIndicesIT : AlertingRestTestCase() {
executeMonitor(trueMonitor.id)
assertIndexExists(AlertIndices.FINDING_HISTORY_WRITE_INDEX)
verifyIndexSchemaVersion(ScheduledJob.SCHEDULED_JOBS_INDEX, 6)
verifyIndexSchemaVersion(AlertIndices.FINDING_HISTORY_WRITE_INDEX, 1)
verifyIndexSchemaVersion(AlertIndices.FINDING_HISTORY_WRITE_INDEX, 2)
}

fun `test alert index gets recreated automatically if deleted`() {
Expand Down

0 comments on commit a7b03d3

Please sign in to comment.