diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt index d03db678f..bdcf8263b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt @@ -11,10 +11,13 @@ import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager +import org.opensearch.action.ActionListener import org.opensearch.action.bulk.BackoffPolicy +import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.alerting.alerts.AlertIndices import org.opensearch.alerting.alerts.moveAlerts import org.opensearch.alerting.core.JobRunner +import org.opensearch.alerting.core.ScheduledJobIndices import org.opensearch.alerting.model.MonitorRunResult import org.opensearch.alerting.model.destination.DestinationContextFactory import org.opensearch.alerting.opensearchapi.retry @@ -29,6 +32,7 @@ import org.opensearch.alerting.settings.DestinationSettings.Companion.ALLOW_LIST import org.opensearch.alerting.settings.DestinationSettings.Companion.HOST_DENY_LIST import org.opensearch.alerting.settings.DestinationSettings.Companion.loadDestinationSettings import org.opensearch.alerting.util.DocLevelMonitorQueries +import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.isDocLevelMonitor import org.opensearch.client.Client import org.opensearch.cluster.metadata.IndexNameExpressionResolver @@ -223,6 +227,23 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon } suspend fun runJob(job: ScheduledJob, periodStart: Instant, periodEnd: Instant, dryrun: Boolean): MonitorRunResult<*> { + + // Updating the scheduled job index at the start of monitor execution runs for when there is an upgrade the the schema mapping + // has not been updated. + if (!IndexUtils.scheduledJobIndexUpdated && monitorCtx.clusterService != null && monitorCtx.client != null) { + IndexUtils.updateIndexMapping( + ScheduledJob.SCHEDULED_JOBS_INDEX, + ScheduledJobIndices.scheduledJobMappings(), monitorCtx.clusterService!!.state(), monitorCtx.client!!.admin().indices(), + object : ActionListener { + override fun onResponse(response: AcknowledgedResponse) { + } + override fun onFailure(t: Exception) { + logger.error("Failed to update config index schema", t) + } + } + ) + } + val monitor = job as Monitor val runResult = if (monitor.isBucketLevelMonitor()) { BucketLevelMonitorRunner.runMonitor(monitor, monitorCtx, periodStart, periodEnd, dryrun)