Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.13] [Backport 2.x] fix monitor renew lock issue #1630

Merged
merged 1 commit into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/security-test-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ jobs:
if docker pull opensearchstaging/opensearch:$docker_version
then
echo "FROM opensearchstaging/opensearch:$docker_version" >> Dockerfile
echo "RUN if [ -d /usr/share/opensearch/plugins/opensearch-security-analytics ]; then /usr/share/opensearch/bin/opensearch-plugin remove opensearch-security-analytics; fi" >> Dockerfile
echo "RUN if [ -d /usr/share/opensearch/plugins/opensearch-alerting ]; then /usr/share/opensearch/bin/opensearch-plugin remove opensearch-alerting; fi" >> Dockerfile
echo "ADD alerting/build/distributions/$plugin /tmp/" >> Dockerfile
echo "RUN /usr/share/opensearch/bin/opensearch-plugin install --batch file:/tmp/$plugin" >> Dockerfile
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,12 +313,12 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
when (job) {
is Workflow -> {
launch {
var lock: LockModel? = null
var workflowLock: LockModel? = null
try {
lock = monitorCtx.client!!.suspendUntil<Client, LockModel?> {
workflowLock = monitorCtx.client!!.suspendUntil<Client, LockModel?> {
monitorCtx.lockService!!.acquireLock(job, it)
} ?: return@launch
logger.debug("lock ${lock!!.lockId} acquired")
logger.debug("lock ${workflowLock.lockId} acquired")

monitorCtx.client!!.suspendUntil<Client, ExecuteWorkflowResponse> {
monitorCtx.client!!.execute(
Expand All @@ -333,20 +333,22 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
it
)
}
} catch (e: Exception) {
logger.error("Workflow run failed for workflow with id ${job.id}", e)
} finally {
monitorCtx.client!!.suspendUntil<Client, Boolean> { monitorCtx.lockService!!.release(lock, it) }
logger.debug("lock ${lock!!.lockId} released")
monitorCtx.client!!.suspendUntil<Client, Boolean> { monitorCtx.lockService!!.release(workflowLock, it) }
logger.debug("lock ${workflowLock?.lockId} released")
}
}
}
is Monitor -> {
launch {
var lock: LockModel? = null
var monitorLock: LockModel? = null
try {
lock = monitorCtx.client!!.suspendUntil<Client, LockModel?> {
monitorLock = monitorCtx.client!!.suspendUntil<Client, LockModel?> {
monitorCtx.lockService!!.acquireLock(job, it)
} ?: return@launch
logger.debug("lock ${lock!!.lockId} acquired")
logger.debug("lock ${monitorLock.lockId} acquired")
logger.debug(
"PERF_DEBUG: executing ${job.monitorType} ${job.id} on node " +
monitorCtx.clusterService!!.state().nodes().localNode.id
Expand All @@ -365,9 +367,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
it
)
}
} catch (e: Exception) {
logger.error("Monitor run failed for monitor with id ${job.id}", e)
} finally {
monitorCtx.client!!.suspendUntil<Client, Boolean> { monitorCtx.lockService!!.release(lock, it) }
logger.debug("lock ${lock!!.lockId} released")
monitorCtx.client!!.suspendUntil<Client, Boolean> { monitorCtx.lockService!!.release(monitorLock, it) }
logger.debug("lock ${monitorLock?.lockId} released")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -921,7 +921,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
return response
}

public fun indexDoc(client: RestClient, index: String, doc: String, refresh: Boolean = true): Response {
fun indexDoc(client: RestClient, index: String, doc: String, refresh: Boolean = true): Response {
val requestBody = StringEntity(doc, APPLICATION_JSON)
val params = if (refresh) mapOf("refresh" to "true") else mapOf()
val response = client.makeRequest("POST", "$index/_doc?op_type=create", params, requestBody)
Expand All @@ -932,6 +932,17 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
return response
}

fun updateDoc(client: RestClient, index: String, id: String, doc: String, refresh: Boolean = true): Response {
val requestBody = StringEntity(doc, APPLICATION_JSON)
val params = if (refresh) mapOf("refresh" to "true") else mapOf()
val response = client.makeRequest("PUT", "$index/_doc/$id", params, requestBody)
assertTrue(
"Unable to index doc: '${doc.take(15)}...' to index: '$index'",
listOf(RestStatus.OK, RestStatus.CREATED).contains(response.restStatus())
)
return response
}

protected fun deleteDoc(index: String, id: String, refresh: Boolean = true): Response {
val params = if (refresh) mapOf("refresh" to "true") else mapOf()
val response = client().makeRequest("DELETE", "$index/_doc/$id", params)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import java.time.temporal.ChronoUnit
import java.time.temporal.ChronoUnit.MILLIS
import java.util.Locale
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean

class DocumentMonitorRunnerIT : AlertingRestTestCase() {

Expand Down Expand Up @@ -2789,4 +2790,121 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
assertTrue(e.message!!.contains("alerting_exception"))
}
}

fun `test execute monitor generates alerts and findings with renewable locks`() {
val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf())
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))

val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val monitor = createMonitor(
randomDocumentLevelMonitor(
name = "__lag-monitor-test__",
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
schedule = IntervalSchedule(interval = 1, unit = ChronoUnit.MINUTES),
enabled = true
)
)
assertNotNull(monitor.id)
assertNotNull(monitor.id)

indexDoc(testIndex, "1", testDoc)
indexDoc(testIndex, "2", testDoc)

var found = AtomicBoolean(false)
OpenSearchTestCase.waitUntil(
{
val res = (searchFindings(monitor).size == 2)
found.set(res)
found.get()
}, 2, TimeUnit.MINUTES
)
assertEquals(found.get(), true)

updateMonitor(monitor.copy(enabled = false, enabledTime = null))

val currTimeStampMinusTenMinutes = System.currentTimeMillis() - 600000L
val lock = "{\"scheduled_job_id\":\"${monitor.id}\",\"lock_time\":${currTimeStampMinusTenMinutes / 1000},\"released\":false}"
updateDoc(client(), LockService.LOCK_INDEX_NAME, "${monitor.id}-lock", lock)

updateMonitor(monitor.copy(enabled = true), true)

indexDoc(testIndex, "4", testDoc)
indexDoc(testIndex, "5", testDoc)

found = AtomicBoolean(false)
OpenSearchTestCase.waitUntil(
{
val res = (searchFindings(monitor).size == 4)
found.set(res)
found.get()
}, 2, TimeUnit.MINUTES
)
assertEquals(found.get(), true)
assertTrue(true)
}

fun `test execute monitor generates alerts and findings with non renewable locks`() {
val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf())
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))

val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val monitor = createMonitor(
randomDocumentLevelMonitor(
name = "__lag-monitor-test__",
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
schedule = IntervalSchedule(interval = 1, unit = ChronoUnit.MINUTES),
enabled = true
)
)
assertNotNull(monitor.id)
assertNotNull(monitor.id)

indexDoc(testIndex, "1", testDoc)
indexDoc(testIndex, "2", testDoc)

var found = AtomicBoolean(false)
OpenSearchTestCase.waitUntil(
{
val res = (searchFindings(monitor).size == 2)
found.set(res)
found.get()
}, 2, TimeUnit.MINUTES
)
assertEquals(found.get(), true)

val currTimeStamp = System.currentTimeMillis()
val lock = "{\"scheduled_job_id\":\"${monitor.id}\",\"lock_time\":$currTimeStamp,\"released\":false}"
updateDoc(client(), LockService.LOCK_INDEX_NAME, "${monitor.id}-lock", lock)

indexDoc(testIndex, "4", testDoc)
indexDoc(testIndex, "5", testDoc)

found = AtomicBoolean(false)
OpenSearchTestCase.waitUntil(
{
val res = (searchFindings(monitor).size == 4)
found.set(res)
found.get()
}, 2, TimeUnit.MINUTES
)
assertEquals(found.get(), false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import org.opensearch.action.update.UpdateResponse
import org.opensearch.client.Client
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentType
Expand All @@ -29,6 +30,7 @@ import org.opensearch.index.engine.VersionConflictEngineException
import org.opensearch.index.seqno.SequenceNumbers
import java.io.IOException
import java.time.Instant
import java.util.concurrent.TimeUnit

private val log = LogManager.getLogger(LockService::class.java)

Expand All @@ -37,6 +39,7 @@ class LockService(private val client: Client, private val clusterService: Cluste

companion object {
const val LOCK_INDEX_NAME = ".opensearch-alerting-config-lock"
val LOCK_EXPIRED_MINUTES = TimeValue(5, TimeUnit.MINUTES)

@JvmStatic
fun lockMapping(): String? {
Expand Down Expand Up @@ -72,13 +75,23 @@ class LockService(private val client: Client, private val clusterService: Cluste
object : ActionListener<LockModel> {
override fun onResponse(existingLock: LockModel?) {
if (existingLock != null) {
val currentTimestamp = getNow()
if (isLockReleased(existingLock)) {
log.debug("lock is released or expired: {}", existingLock)
val updateLock = LockModel(existingLock, getNow(), false)
updateLock(updateLock, listener)
} else {
log.debug("Lock is NOT released or expired. {}", existingLock)
listener.onResponse(null)
log.debug("Lock is NOT released. {}", existingLock)
if (existingLock.lockTime.epochSecond + LOCK_EXPIRED_MINUTES.seconds
< currentTimestamp.epochSecond
) {
log.debug("Lock is expired. Renewing Lock {}", existingLock)
val updateLock = LockModel(existingLock, getNow(), false)
updateLock(updateLock, listener)
} else {
log.debug("Lock is NOT expired. Not running monitor {}", existingLock)
listener.onResponse(null)
}
}
} else {
val tempLock = LockModel(scheduledJobId, getNow(), false)
Expand Down Expand Up @@ -220,7 +233,7 @@ class LockService(private val client: Client, private val clusterService: Cluste
listener: ActionListener<Boolean>
) {
if (lock == null) {
log.debug("Lock is null. Nothing to release.")
log.error("Lock is null. Nothing to release.")
listener.onResponse(false)
} else {
log.debug("Releasing lock: {}", lock)
Expand Down
Loading