Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Custom history indicies #621

Merged
merged 1 commit into from
Oct 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,9 @@ class AlertService(
retryPolicy: BackoffPolicy,
allowUpdatingAcknowledgedAlert: Boolean = false
) {
val alertIndex = dataSources.alertsIndex
val alertsIndex = dataSources.alertsIndex
val alertsHistoryIndex = dataSources.alertsHistoryIndex

var requestsToRetry = alerts.flatMap { alert ->
// We don't want to set the version when saving alerts because the MonitorRunner has first priority when writing alerts.
// In the rare event that a user acknowledges an alert between when it's read and when it's written
Expand All @@ -293,7 +295,7 @@ class AlertService(
when (alert.state) {
Alert.State.ACTIVE, Alert.State.ERROR -> {
listOf<DocWriteRequest<*>>(
IndexRequest(alertIndex)
IndexRequest(alertsIndex)
.routing(alert.monitorId)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
.id(if (alert.id != Alert.NO_ID) alert.id else null)
Expand All @@ -304,7 +306,7 @@ class AlertService(
// and updated by the MonitorRunner
if (allowUpdatingAcknowledgedAlert) {
listOf<DocWriteRequest<*>>(
IndexRequest(alertIndex)
IndexRequest(alertsIndex)
.routing(alert.monitorId)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
.id(if (alert.id != Alert.NO_ID) alert.id else null)
Expand All @@ -318,11 +320,11 @@ class AlertService(
}
Alert.State.COMPLETED -> {
listOfNotNull<DocWriteRequest<*>>(
DeleteRequest(alertIndex, alert.id)
DeleteRequest(alertsIndex, alert.id)
.routing(alert.monitorId),
// Only add completed alert to history index if history is enabled
if (alertIndices.isAlertHistoryEnabled(dataSources)) {
IndexRequest(AlertIndices.ALERT_HISTORY_WRITE_INDEX)
if (alertIndices.isAlertHistoryEnabled()) {
IndexRequest(alertsHistoryIndex)
.routing(alert.monitorId)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
.id(alert.id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
private val logger = LogManager.getLogger(javaClass)

var monitorCtx: MonitorRunnerExecutionContext = MonitorRunnerExecutionContext()

private lateinit var runnerSupervisor: Job
override val coroutineContext: CoroutineContext
get() = Dispatchers.Default + runnerSupervisor
Expand Down Expand Up @@ -184,7 +183,7 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
launch {
try {
monitorCtx.moveAlertsRetryPolicy!!.retry(logger) {
if (monitorCtx.alertIndices!!.isAlertInitialized()) {
if (monitorCtx.alertIndices!!.isAlertInitialized(job.dataSources)) {
moveAlerts(monitorCtx.client!!, job.id, job)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,13 +231,25 @@ class AlertIndices(
return alertIndexInitialized && alertHistoryIndexInitialized
}

fun isAlertHistoryEnabled(dataSources: DataSources): Boolean {
if (dataSources.alertsIndex == ALERT_INDEX) {
return alertHistoryEnabled
fun isAlertInitialized(dataSources: DataSources): Boolean {
val alertsIndex = dataSources.alertsIndex
val alertsHistoryIndex = dataSources.alertsHistoryIndex
if (alertsIndex == ALERT_INDEX && alertsHistoryIndex == ALERT_HISTORY_WRITE_INDEX) {
return alertIndexInitialized && alertHistoryIndexInitialized
}
if (
clusterService.state().metadata.indices.containsKey(alertsIndex) &&
clusterService.state().metadata.hasAlias(alertsHistoryIndex)
) {
return true
}
return false
}

fun isAlertHistoryEnabled(): Boolean {
return alertHistoryEnabled
}

fun isFindingHistoryEnabled(): Boolean = findingHistoryEnabled

suspend fun createOrUpdateAlertIndex() {
Expand Down Expand Up @@ -265,6 +277,19 @@ class AlertIndices(
if (dataSources.alertsIndex == ALERT_INDEX) {
return createOrUpdateInitialAlertHistoryIndex()
}
if (!clusterService.state().metadata.hasAlias(dataSources.alertsHistoryIndex)) {
createIndex(
dataSources.alertsHistoryIndexPattern ?: ALERT_HISTORY_INDEX_PATTERN,
alertMapping(),
dataSources.alertsHistoryIndex
)
} else {
updateIndexMapping(
dataSources.alertsHistoryIndex ?: ALERT_HISTORY_WRITE_INDEX,
alertMapping(),
true
)
}
}
suspend fun createOrUpdateInitialAlertHistoryIndex() {
if (!alertHistoryIndexInitialized) {
Expand Down Expand Up @@ -300,13 +325,15 @@ class AlertIndices(
return createOrUpdateInitialFindingHistoryIndex()
}
val findingsIndex = dataSources.findingsIndex
val findingsIndexPattern = dataSources.findingsIndexPattern ?: FINDING_HISTORY_INDEX_PATTERN
if (!clusterService.state().routingTable().hasIndex(findingsIndex)) {
createIndex(
findingsIndex,
findingMapping()
findingsIndexPattern,
findingMapping(),
findingsIndex
)
} else {
updateIndexMapping(findingsIndex, findingMapping(), false)
updateIndexMapping(findingsIndex, findingMapping(), true)
}
}

Expand Down Expand Up @@ -339,6 +366,7 @@ class AlertIndices(
targetIndex = IndexUtils.getIndexNameWithAlias(clusterState, index)
}

// TODO call getMapping and compare actual mappings here instead of this
if (targetIndex == IndexUtils.lastUpdatedAlertHistoryIndex || targetIndex == IndexUtils.lastUpdatedFindingHistoryIndex) {
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,14 @@ import org.opensearch.search.builder.SearchSourceBuilder
* 1. Find active alerts:
* a. matching monitorId if no monitor is provided (postDelete)
* b. matching monitorId and no triggerIds if monitor is provided (postIndex)
* 2. Move alerts over to [ALERT_HISTORY_WRITE_INDEX] as DELETED
* 3. Delete alerts from [ALERT_INDEX]
* 2. Move alerts over to DataSources.alertsHistoryIndex as DELETED
* 3. Delete alerts from monitor's DataSources.alertsIndex
* 4. Schedule a retry if there were any failures
*/
suspend fun moveAlerts(client: Client, monitorId: String, monitor: Monitor? = null) {
suspend fun moveAlerts(client: Client, monitorId: String, monitor: Monitor?) {
var alertIndex = monitor?.dataSources?.alertsIndex ?: ALERT_INDEX
var alertHistoryIndex = monitor?.dataSources?.alertsHistoryIndex ?: ALERT_HISTORY_WRITE_INDEX

val boolQuery = QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(Alert.MONITOR_ID_FIELD, monitorId))

Expand All @@ -53,15 +56,15 @@ suspend fun moveAlerts(client: Client, monitorId: String, monitor: Monitor? = nu
.query(boolQuery)
.version(true)

val activeAlertsRequest = SearchRequest(AlertIndices.ALERT_INDEX)
val activeAlertsRequest = SearchRequest(alertIndex)
.routing(monitorId)
.source(activeAlertsQuery)
val response: SearchResponse = client.suspendUntil { search(activeAlertsRequest, it) }

// If no alerts are found, simply return
if (response.hits.totalHits?.value == 0L) return
val indexRequests = response.hits.map { hit ->
IndexRequest(AlertIndices.ALERT_HISTORY_WRITE_INDEX)
IndexRequest(alertHistoryIndex)
.routing(monitorId)
.source(
Alert.parse(alertContentParser(hit.sourceRef), hit.id, hit.version)
Expand All @@ -76,7 +79,7 @@ suspend fun moveAlerts(client: Client, monitorId: String, monitor: Monitor? = nu
val copyResponse: BulkResponse = client.suspendUntil { bulk(copyRequest, it) }

val deleteRequests = copyResponse.items.filterNot { it.isFailed }.map {
DeleteRequest(AlertIndices.ALERT_INDEX, it.id)
DeleteRequest(alertIndex, it.id)
.routing(monitorId)
.version(it.version)
.versionType(VersionType.EXTERNAL_GTE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ class TransportAcknowledgeAlertAction @Inject constructor(
}

private suspend fun onSearchResponse(response: SearchResponse, monitor: Monitor) {
val alertsHistoryIndex = monitor.dataSources.alertsHistoryIndex
val updateRequests = mutableListOf<UpdateRequest>()
val copyRequests = mutableListOf<IndexRequest>()
response.hits.forEach { hit ->
Expand Down Expand Up @@ -176,7 +177,7 @@ class TransportAcknowledgeAlertAction @Inject constructor(
)
updateRequests.add(updateRequest)
} else {
val copyRequest = IndexRequest(AlertIndices.ALERT_HISTORY_WRITE_INDEX)
val copyRequest = IndexRequest(alertsHistoryIndex)
.routing(request.monitorId)
.id(alert.id)
.source(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@ import org.opensearch.common.settings.Settings
import org.opensearch.commons.alerting.action.AcknowledgeAlertRequest
import org.opensearch.commons.alerting.action.AlertingActions
import org.opensearch.commons.alerting.action.GetAlertsRequest
import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.DataSources
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.DocLevelQuery
import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
import org.opensearch.commons.alerting.model.Table
import org.opensearch.test.OpenSearchTestCase
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoUnit.MILLIS
import java.util.concurrent.TimeUnit
import java.util.stream.Collectors

class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
Expand Down Expand Up @@ -465,6 +468,54 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
}
}

fun `test search custom alerts history index`() {
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
val trigger1 = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val trigger2 = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val customAlertsIndex = "custom_alerts_index"
val customAlertsHistoryIndex = "custom_alerts_history_index"
val customAlertsHistoryIndexPattern = "<custom_alerts_history_index-{now/d}-1>"
var monitor = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger1, trigger2),
dataSources = DataSources(
alertsIndex = customAlertsIndex,
alertsHistoryIndex = customAlertsHistoryIndex,
alertsHistoryIndexPattern = customAlertsHistoryIndexPattern
)
)
val monitorResponse = createMonitor(monitor)
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""
assertFalse(monitorResponse?.id.isNullOrEmpty())
monitor = monitorResponse!!.monitor
indexDoc(index, "1", testDoc)
val monitorId = monitorResponse.id
val executeMonitorResponse = executeMonitor(monitor, monitorId, false)
var alertsBefore = searchAlerts(monitorId, customAlertsIndex)
Assert.assertEquals(2, alertsBefore.size)
Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name)
Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 2)
// Remove 1 trigger from monitor to force moveAlerts call to move alerts to history index
monitor = monitor.copy(triggers = listOf(trigger1))
updateMonitor(monitor, monitorId)

var alerts = listOf<Alert>()
OpenSearchTestCase.waitUntil({
alerts = searchAlerts(monitorId, customAlertsHistoryIndex)
if (alerts.size == 1) {
return@waitUntil true
}
return@waitUntil false
}, 30, TimeUnit.SECONDS)
assertEquals("Alerts from custom history index", 1, alerts.size)
}

fun `test get alerts by list of monitors containing both existent and non-existent ids`() {
val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3")
val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))
Expand Down