From 5298a14a16e41c4525f361f0477e9ebb1f27976c Mon Sep 17 00:00:00 2001 From: Ashish Agrawal Date: Mon, 17 Apr 2023 17:37:04 -0700 Subject: [PATCH] Update config index schema if needed at the start of each monitor execution (#849) * Update config index schema if needed at the start of each monitor execution Signed-off-by: Ashish Agrawal --- .../alerting/MonitorRunnerService.kt | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt index 98b4d1581..31dfa445b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt @@ -11,10 +11,13 @@ import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager +import org.opensearch.action.ActionListener import org.opensearch.action.bulk.BackoffPolicy +import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.alerting.alerts.AlertIndices import org.opensearch.alerting.alerts.moveAlerts import org.opensearch.alerting.core.JobRunner +import org.opensearch.alerting.core.ScheduledJobIndices import org.opensearch.alerting.core.model.ScheduledJob import org.opensearch.alerting.model.Alert import org.opensearch.alerting.model.Monitor @@ -33,6 +36,7 @@ import org.opensearch.alerting.settings.DestinationSettings.Companion.ALLOW_LIST import org.opensearch.alerting.settings.DestinationSettings.Companion.HOST_DENY_LIST import org.opensearch.alerting.settings.DestinationSettings.Companion.loadDestinationSettings import org.opensearch.alerting.util.DocLevelMonitorQueries +import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.isBucketLevelMonitor import org.opensearch.alerting.util.isDocLevelMonitor import org.opensearch.client.Client @@ -224,6 +228,23 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon } suspend fun runJob(job: ScheduledJob, periodStart: Instant, periodEnd: Instant, dryrun: Boolean): MonitorRunResult<*> { + + // Updating the scheduled job index at the start of monitor execution runs for when there is an upgrade the the schema mapping + // has not been updated. + if (!IndexUtils.scheduledJobIndexUpdated && monitorCtx.clusterService != null && monitorCtx.client != null) { + IndexUtils.updateIndexMapping( + ScheduledJob.SCHEDULED_JOBS_INDEX, + ScheduledJobIndices.scheduledJobMappings(), monitorCtx.clusterService!!.state(), monitorCtx.client!!.admin().indices(), + object : ActionListener { + override fun onResponse(response: AcknowledgedResponse) { + } + override fun onFailure(t: Exception) { + logger.error("Failed to update config index schema", t) + } + } + ) + } + val monitor = job as Monitor val runResult = if (monitor.isBucketLevelMonitor()) { BucketLevelMonitorRunner.runMonitor(monitor, monitorCtx, periodStart, periodEnd, dryrun)