Skip to content

Commit

Permalink
Finding Index rollover (#408)
Browse files Browse the repository at this point in the history
* Finding Index rollover

Signed-off-by: jiahe zhang <[email protected]>

* apply fixes to make rollover work

Signed-off-by: Ashish Agrawal <[email protected]>

Co-authored-by: jiahe zhang <[email protected]>
  • Loading branch information
lezzago and charliezhangaws authored Apr 18, 2022
1 parent 8e991f6 commit 1cc813f
Show file tree
Hide file tree
Showing 26 changed files with 620 additions and 225 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,8 @@ class AlertService(
DeleteRequest(AlertIndices.ALERT_INDEX, alert.id)
.routing(alert.monitorId),
// Only add completed alert to history index if history is enabled
if (alertIndices.isHistoryEnabled()) {
IndexRequest(AlertIndices.HISTORY_WRITE_INDEX)
if (alertIndices.isAlertHistoryEnabled()) {
IndexRequest(AlertIndices.ALERT_HISTORY_WRITE_INDEX)
.routing(alert.monitorId)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
.id(alert.id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,12 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
LegacyOpenDistroDestinationSettings.EMAIL_USERNAME,
LegacyOpenDistroDestinationSettings.EMAIL_PASSWORD,
LegacyOpenDistroDestinationSettings.ALLOW_LIST,
LegacyOpenDistroDestinationSettings.HOST_DENY_LIST
LegacyOpenDistroDestinationSettings.HOST_DENY_LIST,
AlertingSettings.FINDING_HISTORY_ENABLED,
AlertingSettings.FINDING_HISTORY_MAX_DOCS,
AlertingSettings.FINDING_HISTORY_INDEX_MAX_AGE,
AlertingSettings.FINDING_HISTORY_ROLLOVER_PERIOD,
AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ object BucketLevelMonitorRunner : MonitorRunner {
var monitorResult = MonitorRunResult<BucketLevelTriggerRunResult>(monitor.name, periodStart, periodEnd)
val currentAlerts = try {
monitorCtx.alertIndices!!.createOrUpdateAlertIndex()
monitorCtx.alertIndices!!.createOrUpdateInitialHistoryIndex()
monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex()
monitorCtx.alertService!!.loadCurrentAlertsForBucketLevelMonitor(monitor)
} catch (e: Exception) {
// We can't save ERROR alerts to the index here as we don't know if there are existing ACTIVE alerts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import org.opensearch.action.search.SearchAction
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.alerts.AlertIndices.Companion.FINDING_HISTORY_WRITE_INDEX
import org.opensearch.alerting.core.model.DocLevelMonitorInput
import org.opensearch.alerting.core.model.DocLevelQuery
import org.opensearch.alerting.core.model.ScheduledJob
Expand Down Expand Up @@ -67,6 +68,18 @@ object DocumentReturningMonitorRunner : MonitorRunner {
): MonitorRunResult<DocumentLevelTriggerRunResult> {
logger.info("Document-level-monitor is running ...")
var monitorResult = MonitorRunResult<DocumentLevelTriggerRunResult>(monitor.name, periodStart, periodEnd)

// TODO: is this needed from Charlie?
try {
monitorCtx.alertIndices!!.createOrUpdateAlertIndex()
monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex()
monitorCtx.alertIndices!!.createOrUpdateInitialFindingHistoryIndex()
} catch (e: Exception) {
val id = if (monitor.id.trim().isEmpty()) "_na_" else monitor.id
logger.error("Error loading alerts for monitor: $id", e)
return monitorResult.copy(error = e)
}

try {
validate(monitor)
} catch (e: Exception) {
Expand Down Expand Up @@ -246,7 +259,7 @@ object DocumentReturningMonitorRunner : MonitorRunner {
): String {
val finding = Finding(
id = UUID.randomUUID().toString(),
relatedDocId = matchingDocIds.joinToString(","),
relatedDocIds = matchingDocIds,
monitorId = monitor.id,
monitorName = monitor.name,
index = index,
Expand All @@ -259,9 +272,11 @@ object DocumentReturningMonitorRunner : MonitorRunner {
logger.info("Findings: $findingStr")

// todo: below is all hardcoded, temp code and added only to test. replace this with proper Findings index lifecycle management.
val indexRequest = IndexRequest(".opensearch-alerting-findings")
val indexRequest = IndexRequest(FINDING_HISTORY_WRITE_INDEX)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(findingStr, XContentType.JSON)
.id(finding.id)
.routing(finding.id)

monitorCtx.client!!.index(indexRequest).actionGet()
return finding.id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
launch {
try {
monitorCtx.moveAlertsRetryPolicy!!.retry(logger) {
if (monitorCtx.alertIndices!!.isInitialized()) {
if (monitorCtx.alertIndices!!.isAlertInitialized()) {
moveAlerts(monitorCtx.client!!, job.id, job)
}
}
Expand All @@ -189,7 +189,7 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
launch {
try {
monitorCtx.moveAlertsRetryPolicy!!.retry(logger) {
if (monitorCtx.alertIndices!!.isInitialized()) {
if (monitorCtx.alertIndices!!.isAlertInitialized()) {
moveAlerts(monitorCtx.client!!, jobId, null)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ object QueryLevelMonitorRunner : MonitorRunner {
var monitorResult = MonitorRunResult<QueryLevelTriggerRunResult>(monitor.name, periodStart, periodEnd)
val currentAlerts = try {
monitorCtx.alertIndices!!.createOrUpdateAlertIndex()
monitorCtx.alertIndices!!.createOrUpdateInitialHistoryIndex()
monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex()
monitorCtx.alertService!!.loadCurrentAlertsForQueryLevelMonitor(monitor)
} catch (e: Exception) {
// We can't save ERROR alerts to the index here as we don't know if there are existing ACTIVE alerts
Expand Down
Loading

0 comments on commit 1cc813f

Please sign in to comment.