Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix monitor renew lock issue #1623

Merged
merged 3 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -331,12 +331,12 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
when (job) {
is Workflow -> {
launch {
var lock: LockModel? = null
var workflowLock: LockModel? = null
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and catch between try and finally for lock acquire logic

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added catch between try & finally block.

try {
lock = monitorCtx.client!!.suspendUntil<Client, LockModel?> {
workflowLock = monitorCtx.client!!.suspendUntil<Client, LockModel?> {
monitorCtx.lockService!!.acquireLock(job, it)
} ?: return@launch
logger.debug("lock ${lock!!.lockId} acquired")
logger.debug("lock ${workflowLock.lockId} acquired")

monitorCtx.client!!.suspendUntil<Client, ExecuteWorkflowResponse> {
monitorCtx.client!!.execute(
Expand All @@ -352,19 +352,19 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
)
}
} finally {
monitorCtx.client!!.suspendUntil<Client, Boolean> { monitorCtx.lockService!!.release(lock, it) }
logger.debug("lock ${lock?.lockId} released")
monitorCtx.client!!.suspendUntil<Client, Boolean> { monitorCtx.lockService!!.release(workflowLock, it) }
logger.debug("lock ${workflowLock?.lockId} released")
}
}
}
is Monitor -> {
launch {
var lock: LockModel? = null
var monitorLock: LockModel? = null
try {
lock = monitorCtx.client!!.suspendUntil<Client, LockModel?> {
monitorLock = monitorCtx.client!!.suspendUntil<Client, LockModel?> {
monitorCtx.lockService!!.acquireLock(job, it)
} ?: return@launch
logger.debug("lock ${lock!!.lockId} acquired")
logger.debug("lock ${monitorLock.lockId} acquired")
logger.debug(
"PERF_DEBUG: executing ${job.monitorType} ${job.id} on node " +
monitorCtx.clusterService!!.state().nodes().localNode.id
Expand All @@ -384,8 +384,8 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
)
}
} finally {
monitorCtx.client!!.suspendUntil<Client, Boolean> { monitorCtx.lockService!!.release(lock, it) }
logger.debug("lock ${lock?.lockId} released")
monitorCtx.client!!.suspendUntil<Client, Boolean> { monitorCtx.lockService!!.release(monitorLock, it) }
logger.debug("lock ${monitorLock?.lockId} released")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import org.opensearch.action.update.UpdateResponse
import org.opensearch.client.Client
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentType
Expand All @@ -29,6 +30,7 @@ import org.opensearch.index.engine.VersionConflictEngineException
import org.opensearch.index.seqno.SequenceNumbers
import java.io.IOException
import java.time.Instant
import java.util.concurrent.TimeUnit

private val log = LogManager.getLogger(LockService::class.java)

Expand All @@ -37,6 +39,7 @@ class LockService(private val client: Client, private val clusterService: Cluste

companion object {
const val LOCK_INDEX_NAME = ".opensearch-alerting-config-lock"
val LOCK_EXPIRED_SECONDS = TimeValue(5, TimeUnit.MINUTES)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not holding seconds?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to mins.


@JvmStatic
fun lockMapping(): String? {
Expand Down Expand Up @@ -72,13 +75,23 @@ class LockService(private val client: Client, private val clusterService: Cluste
object : ActionListener<LockModel> {
override fun onResponse(existingLock: LockModel?) {
if (existingLock != null) {
val currentTimestamp = getNow()
if (isLockReleased(existingLock)) {
log.debug("lock is released or expired: {}", existingLock)
val updateLock = LockModel(existingLock, getNow(), false)
updateLock(updateLock, listener)
} else {
log.debug("Lock is NOT released or expired. {}", existingLock)
listener.onResponse(null)
log.debug("Lock is NOT released. {}", existingLock)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plz add a test to verify expired locks are retrieved and deleted/re-used instead. create in now()- 10 mins

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (existingLock.lockTime.epochSecond + LOCK_EXPIRED_SECONDS.seconds
< currentTimestamp.epochSecond
) {
log.debug("Lock is expired. Renewing Lock {}", existingLock)
val updateLock = LockModel(existingLock, getNow(), false)
updateLock(updateLock, listener)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the previous node that owned the lock resumes its execution? i.e. node holding lock was unresponsive due to high traffic and temporarily left the cluster. After traffic subsides it rejoins the cluster

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related question - are there any race conditions with multiple nodes attempting to claim an expired lock?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @engechas , each lock is specific to a monitor. & if the node running the old job goes down, the job is cancelled. the monitor job is then reassigned to a new node & it can then try to renew the lock with this fix now.

} else {
log.debug("Lock is NOT expired. {}", existingLock)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plz explicitly mention we don't run Lock is NOT expired. Not running monitor

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed it.

listener.onResponse(null)
}
}
} else {
val tempLock = LockModel(scheduledJobId, getNow(), false)
Expand Down
Loading