diff --git a/coworker-core/src/main/kotlin/org/camunda/community/extension/coworker/zeebe/worker/JobCoworker.kt b/coworker-core/src/main/kotlin/org/camunda/community/extension/coworker/zeebe/worker/JobCoworker.kt index 6935f2e..52240b6 100644 --- a/coworker-core/src/main/kotlin/org/camunda/community/extension/coworker/zeebe/worker/JobCoworker.kt +++ b/coworker-core/src/main/kotlin/org/camunda/community/extension/coworker/zeebe/worker/JobCoworker.kt @@ -6,17 +6,16 @@ import io.camunda.zeebe.client.api.worker.JobWorker import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import mu.KLogging import org.camunda.community.extension.coworker.zeebe.worker.handler.JobExecutableFactory import java.io.Closeable -import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.atomic.AtomicReference import kotlin.coroutines.CoroutineContext import kotlin.math.roundToInt import kotlin.time.Duration import kotlin.time.Duration.Companion.milliseconds -import kotlin.time.ExperimentalTime /** * Port of [io.camunda.zeebe.client.impl.worker.JobWorkerImpl] but with Kotlin coroutines @@ -24,7 +23,6 @@ import kotlin.time.ExperimentalTime class JobCoworker( private val maxJobsActive: Int, private val activationThreshold: Int = (maxJobsActive * 0.3f).roundToInt(), - private val remainingJobs: AtomicInteger = AtomicInteger(0), private val scheduledCoroutineContext: CoroutineContext, private val jobExecutableFactory: JobExecutableFactory, private val initialPollInterval: Duration, @@ -33,53 +31,83 @@ class JobCoworker( private val additionalCoroutineContextProvider: JobCoroutineContextProvider ) : JobWorker, Closeable { - private val acquiringJobs = AtomicBoolean(true) - private val claimableJobPoller: AtomicReference = AtomicReference(jobPoller) - private val isPollScheduled = AtomicBoolean(false) + private val remainingJobsMutex = Mutex() + private var remainingJobs = 0 - @Volatile + private val isPollScheduledMutex = Mutex() + private var isPollScheduled = false + + private val acquiringJobsMutex = Mutex() + private var acquiringJobs = true + + private val pollIntervalMutex = Mutex() private var pollInterval = initialPollInterval + private val jobPollerMutex = Mutex() + private var claimableJobPoller: JobPoller? = jobPoller + init { - schedulePoll() + runBlocking { + schedulePoll() + } } override fun isOpen(): Boolean { - return acquiringJobs.get() + return runBlocking { isOpenWithSuspension() } + } + + suspend fun isOpenWithSuspension(): Boolean { + return acquiringJobsMutex.withLock { acquiringJobs } } override fun isClosed(): Boolean { - return !isOpen && claimableJobPoller.get() != null && remainingJobs.get() <= 0 + return runBlocking { isClosedWithSuspension() } + } + + suspend fun isClosedWithSuspension(): Boolean { + return !isOpenWithSuspension() && jobPollerMutex.withLock { claimableJobPoller } != null && remainingJobsMutex.withLock { remainingJobs <= 0 } } override fun close() { - acquiringJobs.set(false) + return runBlocking { closeWithSuspension() } + } + + suspend fun closeWithSuspension() { + acquiringJobsMutex.withLock { acquiringJobs = false } } - @OptIn(ExperimentalTime::class) - private fun schedulePoll() { - if (isPollScheduled.compareAndSet(false, true)) { + private suspend fun schedulePoll() { + if (isPollScheduledMutex.withLock { + if (!isPollScheduled) { + isPollScheduled = true + } + isPollScheduled + }) { CoroutineScope(scheduledCoroutineContext).launch { - delay(pollInterval) + delay(pollIntervalMutex.withLock { pollInterval }) onScheduledPoll() } } } private suspend fun onScheduledPoll() { - isPollScheduled.set(false) - val actualRemainingJobs = remainingJobs.get() + isPollScheduledMutex.withLock { isPollScheduled = false } + val actualRemainingJobs = remainingJobsMutex.withLock { remainingJobs } if (shouldPoll(actualRemainingJobs)) { tryPoll() } } - private fun shouldPoll(remainingJobs: Int): Boolean { - return acquiringJobs.get() && remainingJobs <= activationThreshold + private suspend fun shouldPoll(remainingJobs: Int): Boolean { + return acquiringJobsMutex.withLock { acquiringJobs } && remainingJobs <= activationThreshold } - private fun tryClaimJobPoller(): JobPoller? { - return claimableJobPoller.getAndSet(null) + private suspend fun tryClaimJobPoller(): JobPoller? { + return jobPollerMutex.withLock { + val result = claimableJobPoller + claimableJobPoller = null + result + } } private suspend fun tryPoll() { @@ -99,7 +127,7 @@ class JobCoworker( private suspend fun poll(jobPoller: JobPoller) { // check the condition again within the critical section // to avoid race conditions that would let us exceed the buffer size - val actualRemainingJobs = remainingJobs.get() + val actualRemainingJobs = remainingJobsMutex.withLock { remainingJobs } if (!shouldPoll(actualRemainingJobs)) { logger.trace { "Expected to activate for jobs, but still enough remain. Reschedule poll." } releaseJobPoller(jobPoller) @@ -112,33 +140,40 @@ class JobCoworker( { job: ActivatedJob -> handleJob(job) }, { activatedJobs: Int -> onPollSuccess(jobPoller, activatedJobs) }, { error: Throwable -> onPollError(jobPoller, error) } - ) { this.isOpen } + ) { this.isOpenWithSuspension() } } - private fun releaseJobPoller(jobPoller: JobPoller) { - claimableJobPoller.set(jobPoller) + private suspend fun releaseJobPoller(jobPoller: JobPoller) { + jobPollerMutex.withLock { claimableJobPoller = jobPoller } } /** Apply the backoff strategy by scheduling the next poll at a new interval */ - private fun backoff(jobPoller: JobPoller, error: Throwable) { - val prevInterval = pollInterval - pollInterval = try { - backoffSupplier.supplyRetryDelay(prevInterval.inWholeMilliseconds).milliseconds + private suspend fun backoff(jobPoller: JobPoller, error: Throwable) { + val prevInterval = pollIntervalMutex.withLock { pollInterval } + try { + val delay = backoffSupplier.supplyRetryDelay(prevInterval.inWholeMilliseconds).milliseconds + pollIntervalMutex.withLock { pollInterval = delay } } catch (e: Exception) { logger.warn(e) { "Expected to supply retry delay, but an exception was thrown. Falling back to default backoff supplier" } - DEFAULT_BACKOFF_SUPPLIER.supplyRetryDelay(prevInterval.inWholeMilliseconds).milliseconds + val defaultDelay = DEFAULT_BACKOFF_SUPPLIER.supplyRetryDelay(prevInterval.inWholeMilliseconds).milliseconds + pollIntervalMutex.withLock { pollInterval = defaultDelay } } + val delay = pollIntervalMutex.withLock { pollInterval } logger.debug { - "Failed to activate jobs due to ${error.message}, delay retry for $pollInterval ms" + "Failed to activate jobs due to ${error.message}, delay retry for $delay ms" } releaseJobPoller(jobPoller) schedulePoll() } - private fun onPollSuccess(jobPoller: JobPoller, activatedJobs: Int) { + private suspend fun onPollSuccess(jobPoller: JobPoller, activatedJobs: Int) { // first release, then lookup remaining jobs, to allow handleJobFinished() to poll releaseJobPoller(jobPoller) - val actualRemainingJobs = remainingJobs.addAndGet(activatedJobs) + val actualRemainingJobs = remainingJobsMutex.withLock { + val result = remainingJobs + remainingJobs += activatedJobs + result + } pollInterval = initialPollInterval if (actualRemainingJobs <= 0) { schedulePoll() @@ -146,7 +181,7 @@ class JobCoworker( // if jobs were activated, then successive polling happens due to handleJobFinished } - private fun onPollError(jobPoller: JobPoller, error: Throwable) { + private suspend fun onPollError(jobPoller: JobPoller, error: Throwable) { backoff(jobPoller, error) } @@ -156,13 +191,17 @@ class JobCoworker( } private suspend fun handleJobFinished() { - val actualRemainingJobs = remainingJobs.decrementAndGet() - if (!isPollScheduled.get() && shouldPoll(actualRemainingJobs)) { + val actualRemainingJobs = remainingJobsMutex.withLock { + val result = remainingJobs + remainingJobs -= 1 + result + } + if (isPollScheduledMutex.withLock { !isPollScheduled } && shouldPoll(actualRemainingJobs)) { tryPoll() } } - companion object: KLogging() { + companion object : KLogging() { private val DEFAULT_BACKOFF_SUPPLIER = BackoffSupplier.newBackoffBuilder().build() } } diff --git a/coworker-core/src/main/kotlin/org/camunda/community/extension/coworker/zeebe/worker/builder/JobCoworkerBuilder.kt b/coworker-core/src/main/kotlin/org/camunda/community/extension/coworker/zeebe/worker/builder/JobCoworkerBuilder.kt index 524071a..c97fcba 100644 --- a/coworker-core/src/main/kotlin/org/camunda/community/extension/coworker/zeebe/worker/builder/JobCoworkerBuilder.kt +++ b/coworker-core/src/main/kotlin/org/camunda/community/extension/coworker/zeebe/worker/builder/JobCoworkerBuilder.kt @@ -5,7 +5,6 @@ import io.camunda.zeebe.client.api.JsonMapper import io.camunda.zeebe.client.api.response.ActivatedJob import io.camunda.zeebe.client.api.worker.BackoffSupplier import io.camunda.zeebe.client.api.worker.JobClient -import io.camunda.zeebe.client.api.worker.JobWorker import io.camunda.zeebe.client.impl.command.ArgumentUtil.ensureGreaterThan import io.camunda.zeebe.client.impl.command.ArgumentUtil.ensureNotNullNorEmpty import io.camunda.zeebe.client.impl.worker.JobWorkerBuilderImpl @@ -54,7 +53,7 @@ class JobCoworkerBuilder( ensureGreaterThan("maxJobsActive", maxJobsActive.toLong(), 0) } - fun open(): JobWorker { + fun open(): JobCoworker { checkPreconditions() val requestBuilder = GatewayOuterClass.ActivateJobsRequest.newBuilder() .setType(jobType)