From d201e8a5341ea6ea9144adcabb20818fe6c5f1bc Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Wed, 31 Jul 2024 00:44:51 +0000 Subject: [PATCH] fix monitor renew lock issue Signed-off-by: Subhobrata Dey --- .github/workflows/security-test-workflow.yml | 1 + .../alerting/MonitorRunnerService.kt | 24 ++-- .../alerting/AlertingRestTestCase.kt | 13 +- .../alerting/DocumentMonitorRunnerIT.kt | 118 ++++++++++++++++++ .../alerting/core/lock/LockService.kt | 19 ++- 5 files changed, 161 insertions(+), 14 deletions(-) diff --git a/.github/workflows/security-test-workflow.yml b/.github/workflows/security-test-workflow.yml index a096f26a0..5ca093c4e 100644 --- a/.github/workflows/security-test-workflow.yml +++ b/.github/workflows/security-test-workflow.yml @@ -59,6 +59,7 @@ jobs: if docker pull opensearchstaging/opensearch:$docker_version then echo "FROM opensearchstaging/opensearch:$docker_version" >> Dockerfile + echo "RUN if [ -d /usr/share/opensearch/plugins/opensearch-security-analytics ]; then /usr/share/opensearch/bin/opensearch-plugin remove opensearch-security-analytics; fi" >> Dockerfile echo "RUN if [ -d /usr/share/opensearch/plugins/opensearch-alerting ]; then /usr/share/opensearch/bin/opensearch-plugin remove opensearch-alerting; fi" >> Dockerfile echo "ADD alerting/build/distributions/$plugin /tmp/" >> Dockerfile echo "RUN /usr/share/opensearch/bin/opensearch-plugin install --batch file:/tmp/$plugin" >> Dockerfile diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt index e3147a2d9..cecd92269 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt @@ -331,12 +331,12 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon when (job) { is Workflow -> { launch { - var lock: LockModel? = null + var workflowLock: LockModel? = null try { - lock = monitorCtx.client!!.suspendUntil { + workflowLock = monitorCtx.client!!.suspendUntil { monitorCtx.lockService!!.acquireLock(job, it) } ?: return@launch - logger.debug("lock ${lock!!.lockId} acquired") + logger.debug("lock ${workflowLock.lockId} acquired") monitorCtx.client!!.suspendUntil { monitorCtx.client!!.execute( @@ -351,20 +351,22 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon it ) } + } catch (e: Exception) { + logger.error("Workflow run failed for workflow with id ${job.id}", e) } finally { - monitorCtx.client!!.suspendUntil { monitorCtx.lockService!!.release(lock, it) } - logger.debug("lock ${lock!!.lockId} released") + monitorCtx.client!!.suspendUntil { monitorCtx.lockService!!.release(workflowLock, it) } + logger.debug("lock ${workflowLock?.lockId} released") } } } is Monitor -> { launch { - var lock: LockModel? = null + var monitorLock: LockModel? = null try { - lock = monitorCtx.client!!.suspendUntil { + monitorLock = monitorCtx.client!!.suspendUntil { monitorCtx.lockService!!.acquireLock(job, it) } ?: return@launch - logger.debug("lock ${lock!!.lockId} acquired") + logger.debug("lock ${monitorLock.lockId} acquired") logger.debug( "PERF_DEBUG: executing ${job.monitorType} ${job.id} on node " + monitorCtx.clusterService!!.state().nodes().localNode.id @@ -383,9 +385,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon it ) } + } catch (e: Exception) { + logger.error("Monitor run failed for monitor with id ${job.id}", e) } finally { - monitorCtx.client!!.suspendUntil { monitorCtx.lockService!!.release(lock, it) } - logger.debug("lock ${lock!!.lockId} released") + monitorCtx.client!!.suspendUntil { monitorCtx.lockService!!.release(monitorLock, it) } + logger.debug("lock ${monitorLock?.lockId} released") } } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt index e3cb5121a..a0a9fe7df 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt @@ -958,7 +958,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { return response } - public fun indexDoc(client: RestClient, index: String, doc: String, refresh: Boolean = true): Response { + fun indexDoc(client: RestClient, index: String, doc: String, refresh: Boolean = true): Response { val requestBody = StringEntity(doc, APPLICATION_JSON) val params = if (refresh) mapOf("refresh" to "true") else mapOf() val response = client.makeRequest("POST", "$index/_doc?op_type=create", params, requestBody) @@ -969,6 +969,17 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { return response } + fun updateDoc(client: RestClient, index: String, id: String, doc: String, refresh: Boolean = true): Response { + val requestBody = StringEntity(doc, APPLICATION_JSON) + val params = if (refresh) mapOf("refresh" to "true") else mapOf() + val response = client.makeRequest("PUT", "$index/_doc/$id", params, requestBody) + assertTrue( + "Unable to index doc: '${doc.take(15)}...' to index: '$index'", + listOf(RestStatus.OK, RestStatus.CREATED).contains(response.restStatus()) + ) + return response + } + protected fun deleteDoc(index: String, id: String, refresh: Boolean = true): Response { val params = if (refresh) mapOf("refresh" to "true") else mapOf() val response = client().makeRequest("DELETE", "$index/_doc/$id", params) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index 575ccacd9..94ccfde8a 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -34,6 +34,7 @@ import java.time.temporal.ChronoUnit import java.time.temporal.ChronoUnit.MILLIS import java.util.Locale import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean class DocumentMonitorRunnerIT : AlertingRestTestCase() { @@ -2789,4 +2790,121 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { assertTrue(e.message!!.contains("alerting_exception")) } } + + fun `test execute monitor generates alerts and findings with renewable locks`() { + val testIndex = createTestIndex() + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf()) + val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) + + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor( + randomDocumentLevelMonitor( + name = "__lag-monitor-test__", + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + schedule = IntervalSchedule(interval = 1, unit = ChronoUnit.MINUTES), + enabled = true + ) + ) + assertNotNull(monitor.id) + assertNotNull(monitor.id) + + indexDoc(testIndex, "1", testDoc) + indexDoc(testIndex, "2", testDoc) + + var found = AtomicBoolean(false) + OpenSearchTestCase.waitUntil( + { + val res = (searchFindings(monitor).size == 2) + found.set(res) + found.get() + }, 2, TimeUnit.MINUTES + ) + assertEquals(found.get(), true) + + updateMonitor(monitor.copy(enabled = false, enabledTime = null)) + + val currTimeStampMinusTenMinutes = System.currentTimeMillis() - 600000L + val lock = "{\"scheduled_job_id\":\"${monitor.id}\",\"lock_time\":${currTimeStampMinusTenMinutes / 1000},\"released\":false}" + updateDoc(client(), LockService.LOCK_INDEX_NAME, "${monitor.id}-lock", lock) + + updateMonitor(monitor.copy(enabled = true), true) + + indexDoc(testIndex, "4", testDoc) + indexDoc(testIndex, "5", testDoc) + + found = AtomicBoolean(false) + OpenSearchTestCase.waitUntil( + { + val res = (searchFindings(monitor).size == 4) + found.set(res) + found.get() + }, 2, TimeUnit.MINUTES + ) + assertEquals(found.get(), true) + assertTrue(true) + } + + fun `test execute monitor generates alerts and findings with non renewable locks`() { + val testIndex = createTestIndex() + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf()) + val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) + + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor( + randomDocumentLevelMonitor( + name = "__lag-monitor-test__", + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + schedule = IntervalSchedule(interval = 1, unit = ChronoUnit.MINUTES), + enabled = true + ) + ) + assertNotNull(monitor.id) + assertNotNull(monitor.id) + + indexDoc(testIndex, "1", testDoc) + indexDoc(testIndex, "2", testDoc) + + var found = AtomicBoolean(false) + OpenSearchTestCase.waitUntil( + { + val res = (searchFindings(monitor).size == 2) + found.set(res) + found.get() + }, 2, TimeUnit.MINUTES + ) + assertEquals(found.get(), true) + + val currTimeStamp = System.currentTimeMillis() + val lock = "{\"scheduled_job_id\":\"${monitor.id}\",\"lock_time\":$currTimeStamp,\"released\":false}" + updateDoc(client(), LockService.LOCK_INDEX_NAME, "${monitor.id}-lock", lock) + + indexDoc(testIndex, "4", testDoc) + indexDoc(testIndex, "5", testDoc) + + found = AtomicBoolean(false) + OpenSearchTestCase.waitUntil( + { + val res = (searchFindings(monitor).size == 4) + found.set(res) + found.get() + }, 2, TimeUnit.MINUTES + ) + assertEquals(found.get(), false) + } } diff --git a/core/src/main/kotlin/org/opensearch/alerting/core/lock/LockService.kt b/core/src/main/kotlin/org/opensearch/alerting/core/lock/LockService.kt index 35618e156..0c558038d 100644 --- a/core/src/main/kotlin/org/opensearch/alerting/core/lock/LockService.kt +++ b/core/src/main/kotlin/org/opensearch/alerting/core/lock/LockService.kt @@ -16,6 +16,7 @@ import org.opensearch.action.update.UpdateResponse import org.opensearch.client.Client import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings +import org.opensearch.common.unit.TimeValue import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.XContentFactory import org.opensearch.common.xcontent.XContentType @@ -29,6 +30,7 @@ import org.opensearch.index.engine.VersionConflictEngineException import org.opensearch.index.seqno.SequenceNumbers import java.io.IOException import java.time.Instant +import java.util.concurrent.TimeUnit private val log = LogManager.getLogger(LockService::class.java) @@ -37,6 +39,7 @@ class LockService(private val client: Client, private val clusterService: Cluste companion object { const val LOCK_INDEX_NAME = ".opensearch-alerting-config-lock" + val LOCK_EXPIRED_MINUTES = TimeValue(5, TimeUnit.MINUTES) @JvmStatic fun lockMapping(): String? { @@ -72,13 +75,23 @@ class LockService(private val client: Client, private val clusterService: Cluste object : ActionListener { override fun onResponse(existingLock: LockModel?) { if (existingLock != null) { + val currentTimestamp = getNow() if (isLockReleased(existingLock)) { log.debug("lock is released or expired: {}", existingLock) val updateLock = LockModel(existingLock, getNow(), false) updateLock(updateLock, listener) } else { - log.debug("Lock is NOT released or expired. {}", existingLock) - listener.onResponse(null) + log.debug("Lock is NOT released. {}", existingLock) + if (existingLock.lockTime.epochSecond + LOCK_EXPIRED_MINUTES.seconds + < currentTimestamp.epochSecond + ) { + log.debug("Lock is expired. Renewing Lock {}", existingLock) + val updateLock = LockModel(existingLock, getNow(), false) + updateLock(updateLock, listener) + } else { + log.debug("Lock is NOT expired. Not running monitor {}", existingLock) + listener.onResponse(null) + } } } else { val tempLock = LockModel(scheduledJobId, getNow(), false) @@ -220,7 +233,7 @@ class LockService(private val client: Client, private val clusterService: Cluste listener: ActionListener ) { if (lock == null) { - log.debug("Lock is null. Nothing to release.") + log.error("Lock is null. Nothing to release.") listener.onResponse(false) } else { log.debug("Releasing lock: {}", lock)