-
Notifications
You must be signed in to change notification settings - Fork 104
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix monitor renew lock issue #1623
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,7 @@ import org.opensearch.action.update.UpdateResponse | |
import org.opensearch.client.Client | ||
import org.opensearch.cluster.service.ClusterService | ||
import org.opensearch.common.settings.Settings | ||
import org.opensearch.common.unit.TimeValue | ||
import org.opensearch.common.xcontent.LoggingDeprecationHandler | ||
import org.opensearch.common.xcontent.XContentFactory | ||
import org.opensearch.common.xcontent.XContentType | ||
|
@@ -29,6 +30,7 @@ import org.opensearch.index.engine.VersionConflictEngineException | |
import org.opensearch.index.seqno.SequenceNumbers | ||
import java.io.IOException | ||
import java.time.Instant | ||
import java.util.concurrent.TimeUnit | ||
|
||
private val log = LogManager.getLogger(LockService::class.java) | ||
|
||
|
@@ -37,6 +39,7 @@ class LockService(private val client: Client, private val clusterService: Cluste | |
|
||
companion object { | ||
const val LOCK_INDEX_NAME = ".opensearch-alerting-config-lock" | ||
val LOCK_EXPIRED_MINUTES = TimeValue(5, TimeUnit.MINUTES) | ||
|
||
@JvmStatic | ||
fun lockMapping(): String? { | ||
|
@@ -72,13 +75,23 @@ class LockService(private val client: Client, private val clusterService: Cluste | |
object : ActionListener<LockModel> { | ||
override fun onResponse(existingLock: LockModel?) { | ||
if (existingLock != null) { | ||
val currentTimestamp = getNow() | ||
if (isLockReleased(existingLock)) { | ||
log.debug("lock is released or expired: {}", existingLock) | ||
val updateLock = LockModel(existingLock, getNow(), false) | ||
updateLock(updateLock, listener) | ||
} else { | ||
log.debug("Lock is NOT released or expired. {}", existingLock) | ||
listener.onResponse(null) | ||
log.debug("Lock is NOT released. {}", existingLock) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. plz add a test to verify expired locks are retrieved and deleted/re-used instead. create in now()- 10 mins There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
if (existingLock.lockTime.epochSecond + LOCK_EXPIRED_MINUTES.seconds | ||
< currentTimestamp.epochSecond | ||
) { | ||
log.debug("Lock is expired. Renewing Lock {}", existingLock) | ||
val updateLock = LockModel(existingLock, getNow(), false) | ||
updateLock(updateLock, listener) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens if the previous node that owned the lock resumes its execution? i.e. node holding lock was unresponsive due to high traffic and temporarily left the cluster. After traffic subsides it rejoins the cluster There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Related question - are there any race conditions with multiple nodes attempting to claim an expired lock? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hi @engechas , each lock is specific to a monitor. & if the node running the old job goes down, the job is cancelled. the monitor job is then reassigned to a new node & it can then try to renew the lock with this fix now. |
||
} else { | ||
log.debug("Lock is NOT expired. Not running monitor {}", existingLock) | ||
listener.onResponse(null) | ||
} | ||
} | ||
} else { | ||
val tempLock = LockModel(scheduledJobId, getNow(), false) | ||
|
@@ -220,7 +233,7 @@ class LockService(private val client: Client, private val clusterService: Cluste | |
listener: ActionListener<Boolean> | ||
) { | ||
if (lock == null) { | ||
log.debug("Lock is null. Nothing to release.") | ||
log.error("Lock is null. Nothing to release.") | ||
listener.onResponse(false) | ||
} else { | ||
log.debug("Releasing lock: {}", lock) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change this to error log : https://github.com/opensearch-project/alerting/pull/1623/files#diff-7a7546a20937ab41ffd48d77f0d0ba9b9b6c5031289f2e2eb7543167b99701cbR236
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and catch between try and finally for lock acquire logic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added catch between try & finally block.