Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] integrate security-analytics & alerting for correlation engine #880

Merged
merged 1 commit into from
Apr 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.alerting
import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.ActionListener
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.index.IndexResponse
import org.opensearch.action.search.SearchAction
Expand All @@ -26,12 +27,16 @@ import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.client.Client
import org.opensearch.client.node.NodeClient
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.routing.ShardRouting
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.bytes.BytesReference
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.AlertingPluginInterface
import org.opensearch.commons.alerting.action.PublishFindingsRequest
import org.opensearch.commons.alerting.action.SubscribeFindingsResponse
import org.opensearch.commons.alerting.model.ActionExecutionResult
import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
Expand Down Expand Up @@ -342,6 +347,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
val finding = Finding(
id = UUID.randomUUID().toString(),
relatedDocIds = listOf(docIndex[0]),
correlatedDocIds = listOf(docIndex[0]),
monitorId = monitor.id,
monitorName = monitor.name,
index = docIndex[1],
Expand All @@ -363,9 +369,33 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitorCtx.client!!.index(indexRequest, it)
}
}

try {
publishFinding(monitor, monitorCtx, finding)
} catch (e: Exception) {
// suppress exception
logger.error("Optional finding callback failed", e)
}
return finding.id
}

private fun publishFinding(
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
finding: Finding
) {
val publishFindingsRequest = PublishFindingsRequest(monitor.id, finding)
AlertingPluginInterface.publishFinding(
monitorCtx.client!! as NodeClient,
publishFindingsRequest,
object : ActionListener<SubscribeFindingsResponse> {
override fun onResponse(response: SubscribeFindingsResponse) {}

override fun onFailure(e: Exception) {}
}
)
}

private suspend fun updateLastRunContext(
lastRunContext: Map<String, Any>,
monitorCtx: MonitorRunnerExecutionContext,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"dynamic": "strict",
"_meta" : {
"schema_version": 1
"schema_version": 2
},
"properties": {
"schema_version": {
Expand Down Expand Up @@ -51,6 +51,15 @@
},
"timestamp": {
"type": "long"
},
"correlated_doc_ids": {
"type" : "text",
"analyzer": "whitespace",
"fields" : {
"keyword" : {
"type" : "keyword"
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class AlertIndicesIT : AlertingRestTestCase() {

putFindingMappings(
AlertIndices.findingMapping().trimStart('{').trimEnd('}')
.replace("\"schema_version\": 1", "\"schema_version\": 0")
.replace("\"schema_version\": 2", "\"schema_version\": 0")
)
assertIndexExists(AlertIndices.FINDING_HISTORY_WRITE_INDEX)
verifyIndexSchemaVersion(AlertIndices.FINDING_HISTORY_WRITE_INDEX, 0)
Expand All @@ -89,7 +89,7 @@ class AlertIndicesIT : AlertingRestTestCase() {
executeMonitor(trueMonitor.id)
assertIndexExists(AlertIndices.FINDING_HISTORY_WRITE_INDEX)
verifyIndexSchemaVersion(ScheduledJob.SCHEDULED_JOBS_INDEX, 6)
verifyIndexSchemaVersion(AlertIndices.FINDING_HISTORY_WRITE_INDEX, 1)
verifyIndexSchemaVersion(AlertIndices.FINDING_HISTORY_WRITE_INDEX, 2)
}

fun `test alert index gets recreated automatically if deleted`() {
Expand Down