Skip to content

Commit

Permalink
fix(): Migrate the metric's incrementation in the JobErrorHandler
Browse files Browse the repository at this point in the history
(cherry picked from commit 7ba409c)
  • Loading branch information
aivinog1 committed May 2, 2023
1 parent ca4d7ab commit 619bad8
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ open class CoworkerAutoConfiguration {

@ConditionalOnMissingBean
@Bean
open fun defaultJobErrorHandler(): JobErrorHandler = DefaultSpringZeebeErrorHandler()
open fun defaultJobErrorHandler(
metricsRecorder: MetricsRecorder
): JobErrorHandler = DefaultSpringZeebeErrorHandler(metricsRecorder = metricsRecorder)

@ConditionalOnMissingBean
@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,8 @@ class CoworkerManager(
val coWorker = cozeebe.newCoWorker(coworkerValue.type) { client, job ->
suspendCoroutineUninterceptedOrReturn { it: Continuation<Any> ->
val args = createArguments(client, job, it, coworkerValue.methodInfo.parameters)
try {
metricsRecorder.increase(MetricsRecorder.METRIC_NAME_JOB, MetricsRecorder.ACTION_ACTIVATED, job.type)
coworkerValue.methodInfo.invoke(*(args.toTypedArray()))
} catch (ex: Exception) {
metricsRecorder.increase(MetricsRecorder.METRIC_NAME_JOB, MetricsRecorder.ACTION_FAILED, job.type)
throw ex
}
metricsRecorder.increase(MetricsRecorder.METRIC_NAME_JOB, MetricsRecorder.ACTION_ACTIVATED, job.type)
coworkerValue.methodInfo.invoke(*(args.toTypedArray()))
}
}.also { builder: JobCoworkerBuilder ->
builder.additionalCoroutineContextProvider = jobCoroutineContextProvider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package org.camunda.community.extension.coworker.spring.error

import io.camunda.zeebe.client.api.response.ActivatedJob
import io.camunda.zeebe.client.api.worker.JobClient
import io.camunda.zeebe.spring.client.metrics.MetricsRecorder
import org.camunda.community.extension.coworker.zeebe.worker.handler.error.JobErrorHandler
import org.camunda.community.extension.coworker.zeebe.worker.handler.error.impl.DefaultJobErrorHandler

class DefaultSpringZeebeErrorHandler(
private val jobErrorHandler: JobErrorHandler = DefaultJobErrorHandler()
private val jobErrorHandler: JobErrorHandler = DefaultJobErrorHandler(),
private val metricsRecorder: MetricsRecorder
): JobErrorHandler {
override suspend fun handleError(e: Exception, activatedJob: ActivatedJob, jobClient: JobClient) {
metricsRecorder.increase(MetricsRecorder.METRIC_NAME_JOB, MetricsRecorder.ACTION_FAILED, activatedJob.type)
jobErrorHandler.handleError(e.stripSpringZeebeExceptionIfNeeded(), activatedJob, jobClient)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package org.camunda.community.extension.coworker.spring

import io.camunda.zeebe.client.api.response.ActivatedJob
import io.camunda.zeebe.spring.client.config.ZeebeClientStarterAutoConfiguration
import io.camunda.zeebe.spring.client.metrics.DefaultNoopMetricsRecorder
import io.camunda.zeebe.spring.test.ZeebeSpringTest
import kotlinx.coroutines.slf4j.MDCContext
import mu.KLogging
Expand Down Expand Up @@ -49,7 +50,7 @@ class CoworkerAutoConfigurationSpringBootTest {

@Bean
open fun customErrorHandler(): JobErrorHandler {
val defaultJobErrorHandler = DefaultSpringZeebeErrorHandler()
val defaultJobErrorHandler = DefaultSpringZeebeErrorHandler(metricsRecorder = DefaultNoopMetricsRecorder())
return JobErrorHandler { e, activatedJob, jobClient ->
logger.error(e) { "Got error: ${e.message}, on job: $activatedJob" }
defaultJobErrorHandler.handleError(e, activatedJob, jobClient)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package org.camunda.community.extension.coworker.spring

import com.ninjasquad.springmockk.MockkBean
import com.ninjasquad.springmockk.clear
import io.camunda.zeebe.client.ZeebeClient
import io.camunda.zeebe.client.api.response.ActivatedJob
import io.camunda.zeebe.client.api.worker.JobClient
import io.camunda.zeebe.model.bpmn.Bpmn
import io.camunda.zeebe.spring.client.config.ZeebeClientStarterAutoConfiguration
import io.camunda.zeebe.spring.client.metrics.MetricsRecorder
import io.camunda.zeebe.spring.test.ZeebeSpringTest
import io.mockk.clearMocks
import io.mockk.verify
import kotlinx.coroutines.future.await
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.camunda.community.extension.coworker.spring.annotation.Coworker
import org.junit.jupiter.api.Test
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration
import org.springframework.boot.test.context.SpringBootTest

private const val PASS_NAME = "pass"
private const val THROW_NAME = "throw"

@ZeebeSpringTest
@SpringBootTest(
classes = [
JacksonAutoConfiguration::class,
ZeebeClientStarterAutoConfiguration::class,
CoworkerAutoConfiguration::class,
MetricsTest::class
]
)
class MetricsTest {

@Autowired
private lateinit var zeebeClient: ZeebeClient

@MockkBean(relaxed = true)
private lateinit var metricsRecorder: MetricsRecorder

@Coworker(type = PASS_NAME)
suspend fun testPass(jobClient: JobClient, job: ActivatedJob) {
jobClient.newCompleteCommand(job).send().await()
}

@Coworker(type = "throw")
suspend fun testThrow(jobClient: JobClient, job: ActivatedJob) {
throw Exception("Something goes wrooong!")
}

@Test
fun `should increase metrics on invocation only`() {
// given
clearMocks(metricsRecorder, answers = false)
val modelInstance = Bpmn
.createExecutableProcess()
.startEvent()
.serviceTask().zeebeJobType(PASS_NAME)
.endEvent()
.done()
val deploymentEvent =
zeebeClient.newDeployResourceCommand().addProcessModel(modelInstance, "pass.bpmn").send().join()

// when
zeebeClient.newCreateInstanceCommand()
.processDefinitionKey(deploymentEvent.processes.first().processDefinitionKey).withResult().send().join()

// then
verify(exactly = 1) {
metricsRecorder.increase(
MetricsRecorder.METRIC_NAME_JOB,
MetricsRecorder.ACTION_ACTIVATED,
PASS_NAME
)
}
}

@Test
fun `should increase metrics on error`() {
// given
clearMocks(metricsRecorder, answers = false)
val modelInstance =
Bpmn.createExecutableProcess().startEvent().serviceTask().zeebeJobType(THROW_NAME).zeebeJobRetries("0")
.endEvent().done()
val deploymentEvent =
zeebeClient.newDeployResourceCommand().addProcessModel(modelInstance, "throw.bpmn").send().join()

// when
assertThatThrownBy {
zeebeClient.newCreateInstanceCommand()
.processDefinitionKey(deploymentEvent.processes.first().processDefinitionKey).withResult().send().join()
}

// then
verify(exactly = 1) {
metricsRecorder.increase(
MetricsRecorder.METRIC_NAME_JOB,
MetricsRecorder.ACTION_ACTIVATED,
THROW_NAME
)
}
verify(exactly = 1) {
metricsRecorder.increase(
MetricsRecorder.METRIC_NAME_JOB,
MetricsRecorder.ACTION_FAILED,
THROW_NAME
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import io.camunda.zeebe.client.api.worker.JobClient
import io.camunda.zeebe.model.bpmn.Bpmn
import io.camunda.zeebe.process.test.assertions.BpmnAssert
import io.camunda.zeebe.spring.client.config.ZeebeClientStarterAutoConfiguration
import io.camunda.zeebe.spring.client.metrics.DefaultNoopMetricsRecorder
import io.camunda.zeebe.spring.test.ZeebeSpringTest
import io.mockk.coEvery
import io.mockk.coVerify
Expand Down Expand Up @@ -83,7 +84,7 @@ class WorkerErrorIntegrationTest {
}
}
coEvery { jobErrorHandler.handleError(any(), any(), any()) } coAnswers {
DefaultSpringZeebeErrorHandler(mockJobHandler)
DefaultSpringZeebeErrorHandler(mockJobHandler, metricsRecorder = DefaultNoopMetricsRecorder())
.handleError(
it.invocation.args[0] as Exception,
it.invocation.args[1] as ActivatedJob,
Expand Down

0 comments on commit 619bad8

Please sign in to comment.