Skip to content

Commit

Permalink
fix(): migrate from @Volatile and atomics to kotlin coroutine's mut…
Browse files Browse the repository at this point in the history
…exes
  • Loading branch information
aivinog1 committed Feb 28, 2023
1 parent c28d225 commit b337b40
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,23 @@ import io.camunda.zeebe.client.api.worker.JobWorker
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import mu.KLogging
import org.camunda.community.extension.coworker.zeebe.worker.handler.JobExecutableFactory
import java.io.Closeable
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference
import kotlin.coroutines.CoroutineContext
import kotlin.math.roundToInt
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.ExperimentalTime

/**
* Port of [io.camunda.zeebe.client.impl.worker.JobWorkerImpl] but with Kotlin coroutines
*/
class JobCoworker(
private val maxJobsActive: Int,
private val activationThreshold: Int = (maxJobsActive * 0.3f).roundToInt(),
private val remainingJobs: AtomicInteger = AtomicInteger(0),
private val scheduledCoroutineContext: CoroutineContext,
private val jobExecutableFactory: JobExecutableFactory,
private val initialPollInterval: Duration,
Expand All @@ -33,53 +31,83 @@ class JobCoworker(
private val additionalCoroutineContextProvider: JobCoroutineContextProvider
) : JobWorker, Closeable {

private val acquiringJobs = AtomicBoolean(true)
private val claimableJobPoller: AtomicReference<JobPoller?> = AtomicReference(jobPoller)
private val isPollScheduled = AtomicBoolean(false)
private val remainingJobsMutex = Mutex()
private var remainingJobs = 0

@Volatile
private val isPollScheduledMutex = Mutex()
private var isPollScheduled = false

private val acquiringJobsMutex = Mutex()
private var acquiringJobs = true

private val pollIntervalMutex = Mutex()
private var pollInterval = initialPollInterval

private val jobPollerMutex = Mutex()
private var claimableJobPoller: JobPoller? = jobPoller

init {
schedulePoll()
runBlocking {
schedulePoll()
}
}

override fun isOpen(): Boolean {
return acquiringJobs.get()
return runBlocking { isOpenWithSuspension() }
}

suspend fun isOpenWithSuspension(): Boolean {
return acquiringJobsMutex.withLock { acquiringJobs }
}

override fun isClosed(): Boolean {
return !isOpen && claimableJobPoller.get() != null && remainingJobs.get() <= 0
return runBlocking { isClosedWithSuspension() }
}

suspend fun isClosedWithSuspension(): Boolean {
return !isOpenWithSuspension() && jobPollerMutex.withLock { claimableJobPoller } != null && remainingJobsMutex.withLock { remainingJobs <= 0 }
}

override fun close() {
acquiringJobs.set(false)
return runBlocking { closeWithSuspension() }
}

suspend fun closeWithSuspension() {
acquiringJobsMutex.withLock { acquiringJobs = false }
}

@OptIn(ExperimentalTime::class)
private fun schedulePoll() {
if (isPollScheduled.compareAndSet(false, true)) {
private suspend fun schedulePoll() {
if (isPollScheduledMutex.withLock {
if (!isPollScheduled) {
isPollScheduled = true
}
isPollScheduled
}) {
CoroutineScope(scheduledCoroutineContext).launch {
delay(pollInterval)
delay(pollIntervalMutex.withLock { pollInterval })
onScheduledPoll()
}
}
}

private suspend fun onScheduledPoll() {
isPollScheduled.set(false)
val actualRemainingJobs = remainingJobs.get()
isPollScheduledMutex.withLock { isPollScheduled = false }
val actualRemainingJobs = remainingJobsMutex.withLock { remainingJobs }
if (shouldPoll(actualRemainingJobs)) {
tryPoll()
}
}

private fun shouldPoll(remainingJobs: Int): Boolean {
return acquiringJobs.get() && remainingJobs <= activationThreshold
private suspend fun shouldPoll(remainingJobs: Int): Boolean {
return acquiringJobsMutex.withLock { acquiringJobs } && remainingJobs <= activationThreshold
}

private fun tryClaimJobPoller(): JobPoller? {
return claimableJobPoller.getAndSet(null)
private suspend fun tryClaimJobPoller(): JobPoller? {
return jobPollerMutex.withLock {
val result = claimableJobPoller
claimableJobPoller = null
result
}
}

private suspend fun tryPoll() {
Expand All @@ -99,7 +127,7 @@ class JobCoworker(
private suspend fun poll(jobPoller: JobPoller) {
// check the condition again within the critical section
// to avoid race conditions that would let us exceed the buffer size
val actualRemainingJobs = remainingJobs.get()
val actualRemainingJobs = remainingJobsMutex.withLock { remainingJobs }
if (!shouldPoll(actualRemainingJobs)) {
logger.trace { "Expected to activate for jobs, but still enough remain. Reschedule poll." }
releaseJobPoller(jobPoller)
Expand All @@ -112,41 +140,48 @@ class JobCoworker(
{ job: ActivatedJob -> handleJob(job) },
{ activatedJobs: Int -> onPollSuccess(jobPoller, activatedJobs) },
{ error: Throwable -> onPollError(jobPoller, error) }
) { this.isOpen }
) { this.isOpenWithSuspension() }
}

private fun releaseJobPoller(jobPoller: JobPoller) {
claimableJobPoller.set(jobPoller)
private suspend fun releaseJobPoller(jobPoller: JobPoller) {
jobPollerMutex.withLock { claimableJobPoller = jobPoller }
}

/** Apply the backoff strategy by scheduling the next poll at a new interval */
private fun backoff(jobPoller: JobPoller, error: Throwable) {
val prevInterval = pollInterval
pollInterval = try {
backoffSupplier.supplyRetryDelay(prevInterval.inWholeMilliseconds).milliseconds
private suspend fun backoff(jobPoller: JobPoller, error: Throwable) {
val prevInterval = pollIntervalMutex.withLock { pollInterval }
try {
val delay = backoffSupplier.supplyRetryDelay(prevInterval.inWholeMilliseconds).milliseconds
pollIntervalMutex.withLock { pollInterval = delay }
} catch (e: Exception) {
logger.warn(e) { "Expected to supply retry delay, but an exception was thrown. Falling back to default backoff supplier" }
DEFAULT_BACKOFF_SUPPLIER.supplyRetryDelay(prevInterval.inWholeMilliseconds).milliseconds
val defaultDelay = DEFAULT_BACKOFF_SUPPLIER.supplyRetryDelay(prevInterval.inWholeMilliseconds).milliseconds
pollIntervalMutex.withLock { pollInterval = defaultDelay }
}
val delay = pollIntervalMutex.withLock { pollInterval }
logger.debug {
"Failed to activate jobs due to ${error.message}, delay retry for $pollInterval ms"
"Failed to activate jobs due to ${error.message}, delay retry for $delay ms"
}
releaseJobPoller(jobPoller)
schedulePoll()
}

private fun onPollSuccess(jobPoller: JobPoller, activatedJobs: Int) {
private suspend fun onPollSuccess(jobPoller: JobPoller, activatedJobs: Int) {
// first release, then lookup remaining jobs, to allow handleJobFinished() to poll
releaseJobPoller(jobPoller)
val actualRemainingJobs = remainingJobs.addAndGet(activatedJobs)
val actualRemainingJobs = remainingJobsMutex.withLock {
val result = remainingJobs
remainingJobs += activatedJobs
result
}
pollInterval = initialPollInterval
if (actualRemainingJobs <= 0) {
schedulePoll()
}
// if jobs were activated, then successive polling happens due to handleJobFinished
}

private fun onPollError(jobPoller: JobPoller, error: Throwable) {
private suspend fun onPollError(jobPoller: JobPoller, error: Throwable) {
backoff(jobPoller, error)
}

Expand All @@ -156,13 +191,17 @@ class JobCoworker(
}

private suspend fun handleJobFinished() {
val actualRemainingJobs = remainingJobs.decrementAndGet()
if (!isPollScheduled.get() && shouldPoll(actualRemainingJobs)) {
val actualRemainingJobs = remainingJobsMutex.withLock {
val result = remainingJobs
remainingJobs -= 1
result
}
if (isPollScheduledMutex.withLock { !isPollScheduled } && shouldPoll(actualRemainingJobs)) {
tryPoll()
}
}

companion object: KLogging() {
companion object : KLogging() {
private val DEFAULT_BACKOFF_SUPPLIER = BackoffSupplier.newBackoffBuilder().build()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import io.camunda.zeebe.client.api.JsonMapper
import io.camunda.zeebe.client.api.response.ActivatedJob
import io.camunda.zeebe.client.api.worker.BackoffSupplier
import io.camunda.zeebe.client.api.worker.JobClient
import io.camunda.zeebe.client.api.worker.JobWorker
import io.camunda.zeebe.client.impl.command.ArgumentUtil.ensureGreaterThan
import io.camunda.zeebe.client.impl.command.ArgumentUtil.ensureNotNullNorEmpty
import io.camunda.zeebe.client.impl.worker.JobWorkerBuilderImpl
Expand Down Expand Up @@ -54,7 +53,7 @@ class JobCoworkerBuilder(
ensureGreaterThan("maxJobsActive", maxJobsActive.toLong(), 0)
}

fun open(): JobWorker {
fun open(): JobCoworker {
checkPreconditions()
val requestBuilder = GatewayOuterClass.ActivateJobsRequest.newBuilder()
.setType(jobType)
Expand Down

0 comments on commit b337b40

Please sign in to comment.