diff --git a/dd-java-agent/instrumentation/kafka-connect-0.11/build.gradle b/dd-java-agent/instrumentation/kafka-connect-0.11/build.gradle index 117bff84bfb..d5717e78304 100644 --- a/dd-java-agent/instrumentation/kafka-connect-0.11/build.gradle +++ b/dd-java-agent/instrumentation/kafka-connect-0.11/build.gradle @@ -29,6 +29,9 @@ dependencies { // Spring Kafka Test library testImplementation 'org.springframework.kafka:spring-kafka-test:2.7.9' // Version compatible with Kafka 2.7.x testRuntimeOnly project(':dd-java-agent:instrumentation:kafka-clients-0.11') + testRuntimeOnly project(':dd-java-agent:instrumentation:kafka-clients-3.8') + testRuntimeOnly project(':dd-java-agent:instrumentation:kafka-streams-0.11') + testRuntimeOnly project(':dd-java-agent:instrumentation:kafka-streams-1.0') } configurations.testRuntimeClasspath { diff --git a/dd-java-agent/instrumentation/kafka-connect-0.11/src/main/java/datadog/trace/instrumentation/kafka_connect/TaskListener.java b/dd-java-agent/instrumentation/kafka-connect-0.11/src/main/java/datadog/trace/instrumentation/kafka_connect/TaskListener.java index 871bc326693..c30ab4062e1 100644 --- a/dd-java-agent/instrumentation/kafka-connect-0.11/src/main/java/datadog/trace/instrumentation/kafka_connect/TaskListener.java +++ b/dd-java-agent/instrumentation/kafka-connect-0.11/src/main/java/datadog/trace/instrumentation/kafka_connect/TaskListener.java @@ -19,8 +19,11 @@ public void onStartup(ConnectorTaskId connectorTaskId) { @Override public void onPause(ConnectorTaskId connectorTaskId) { - delegate.onPause(connectorTaskId); - AgentTracer.get().getDataStreamsMonitoring().clearThreadServiceName(); + try { + delegate.onPause(connectorTaskId); + } finally { + AgentTracer.get().getDataStreamsMonitoring().clearThreadServiceName(); + } } @Override @@ -31,13 +34,19 @@ public void onResume(ConnectorTaskId connectorTaskId) { @Override public void onFailure(ConnectorTaskId connectorTaskId, Throwable throwable) { - delegate.onFailure(connectorTaskId, throwable); - AgentTracer.get().getDataStreamsMonitoring().clearThreadServiceName(); + try { + delegate.onFailure(connectorTaskId, throwable); + } finally { + AgentTracer.get().getDataStreamsMonitoring().clearThreadServiceName(); + } } @Override public void onShutdown(ConnectorTaskId connectorTaskId) { - delegate.onShutdown(connectorTaskId); - AgentTracer.get().getDataStreamsMonitoring().clearThreadServiceName(); + try { + delegate.onShutdown(connectorTaskId); + } finally { + AgentTracer.get().getDataStreamsMonitoring().clearThreadServiceName(); + } } } diff --git a/dd-java-agent/instrumentation/kafka-connect-0.11/src/test/groovy/ConnectWorkerInstrumentationTest.groovy b/dd-java-agent/instrumentation/kafka-connect-0.11/src/test/groovy/ConnectWorkerInstrumentationTest.groovy index 5aa97b3ed5f..48c69d7d115 100644 --- a/dd-java-agent/instrumentation/kafka-connect-0.11/src/test/groovy/ConnectWorkerInstrumentationTest.groovy +++ b/dd-java-agent/instrumentation/kafka-connect-0.11/src/test/groovy/ConnectWorkerInstrumentationTest.groovy @@ -150,20 +150,21 @@ class ConnectWorkerInstrumentationTest extends AgentTestRunner { StatsGroup first = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == 0 } verifyAll(first) { - edgeTags == ["direction:out", "kafka_cluster_id:$clusterId", "topic:test-topic", "type:kafka"] - edgeTags.size() == 4 + assert [ + "direction:out", + "topic:test-topic", + "type:kafka" + ].every( tag -> edgeTags.contains(tag) ) } StatsGroup second = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == first.hash } verifyAll(second) { - edgeTags == [ + assert [ "direction:in", "group:test-consumer-group", - "kafka_cluster_id:$clusterId", "topic:test-topic", "type:kafka" - ] - edgeTags.size() == 5 + ].every( tag -> edgeTags.contains(tag) ) } TEST_DATA_STREAMS_WRITER.getServices().contains('file-source-connector')