Skip to content

Commit

Permalink
Put delegate calls in try / finally
Browse files Browse the repository at this point in the history
  • Loading branch information
piochelepiotr committed Dec 11, 2024
1 parent 63200d6 commit 67e1e7b
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 12 deletions.
3 changes: 3 additions & 0 deletions dd-java-agent/instrumentation/kafka-connect-0.11/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ dependencies {
// Spring Kafka Test library
testImplementation 'org.springframework.kafka:spring-kafka-test:2.7.9' // Version compatible with Kafka 2.7.x
testRuntimeOnly project(':dd-java-agent:instrumentation:kafka-clients-0.11')
testRuntimeOnly project(':dd-java-agent:instrumentation:kafka-clients-3.8')
testRuntimeOnly project(':dd-java-agent:instrumentation:kafka-streams-0.11')
testRuntimeOnly project(':dd-java-agent:instrumentation:kafka-streams-1.0')
}

configurations.testRuntimeClasspath {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ public void onStartup(ConnectorTaskId connectorTaskId) {

@Override
public void onPause(ConnectorTaskId connectorTaskId) {
delegate.onPause(connectorTaskId);
AgentTracer.get().getDataStreamsMonitoring().clearThreadServiceName();
try {
delegate.onPause(connectorTaskId);
} finally {
AgentTracer.get().getDataStreamsMonitoring().clearThreadServiceName();
}
}

@Override
Expand All @@ -31,13 +34,19 @@ public void onResume(ConnectorTaskId connectorTaskId) {

@Override
public void onFailure(ConnectorTaskId connectorTaskId, Throwable throwable) {
delegate.onFailure(connectorTaskId, throwable);
AgentTracer.get().getDataStreamsMonitoring().clearThreadServiceName();
try {
delegate.onFailure(connectorTaskId, throwable);
} finally {
AgentTracer.get().getDataStreamsMonitoring().clearThreadServiceName();
}
}

@Override
public void onShutdown(ConnectorTaskId connectorTaskId) {
delegate.onShutdown(connectorTaskId);
AgentTracer.get().getDataStreamsMonitoring().clearThreadServiceName();
try {
delegate.onShutdown(connectorTaskId);
} finally {
AgentTracer.get().getDataStreamsMonitoring().clearThreadServiceName();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -150,20 +150,21 @@ class ConnectWorkerInstrumentationTest extends AgentTestRunner {

StatsGroup first = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == 0 }
verifyAll(first) {
edgeTags == ["direction:out", "kafka_cluster_id:$clusterId", "topic:test-topic", "type:kafka"]
edgeTags.size() == 4
assert [
"direction:out",
"topic:test-topic",
"type:kafka"
].every( tag -> edgeTags.contains(tag) )
}

StatsGroup second = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == first.hash }
verifyAll(second) {
edgeTags == [
assert [
"direction:in",
"group:test-consumer-group",
"kafka_cluster_id:$clusterId",
"topic:test-topic",
"type:kafka"
]
edgeTags.size() == 5
].every( tag -> edgeTags.contains(tag) )
}
TEST_DATA_STREAMS_WRITER.getServices().contains('file-source-connector')

Expand Down

0 comments on commit 67e1e7b

Please sign in to comment.