Skip to content

Commit

Permalink
Add support for Kafka connect
Browse files Browse the repository at this point in the history
  • Loading branch information
piochelepiotr committed Nov 18, 2024
1 parent c2eb6cf commit ff9bbf2
Show file tree
Hide file tree
Showing 7 changed files with 906 additions and 0 deletions.
36 changes: 36 additions & 0 deletions dd-java-agent/instrumentation/kafka-connect-0.11/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
muzzle {
pass {
group = "org.apache.kafka"
module = "connect-runtime"
versions = "[0.11.0.0,)"
assertInverse = true
}
}

apply from: "$rootDir/gradle/java.gradle"

dependencies {
compileOnly group: 'org.apache.kafka', name: 'connect-runtime', version: '0.11.0.0'

testImplementation group: 'org.apache.kafka', name: 'connect-runtime', version: '0.11.0.0'
testImplementation group: 'org.springframework.kafka', name: 'spring-kafka', version: '1.3.3.RELEASE'
testImplementation group: 'org.springframework.kafka', name: 'spring-kafka-test', version: '1.3.3.RELEASE'
testImplementation group: 'javax.xml.bind', name: 'jaxb-api', version: '2.2.3'
testImplementation group: 'org.assertj', name: 'assertj-core', version: '2.9.+'
testImplementation group: 'org.mockito', name: 'mockito-core', version: '2.19.0'
testRuntimeOnly project(':dd-java-agent:instrumentation:spring-scheduling-3.1')
testImplementation(testFixtures(project(':dd-java-agent:agent-iast')))


testRuntimeOnly project(':dd-java-agent:instrumentation:iast-instrumenter')
testRuntimeOnly project(':dd-java-agent:instrumentation:java-lang')
testRuntimeOnly project(':dd-java-agent:instrumentation:java-io')
testRuntimeOnly project(':dd-java-agent:instrumentation:jackson-core')
testRuntimeOnly project(':dd-java-agent:instrumentation:jackson-core:jackson-core-2.8')
testImplementation(group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.9.10')
}

configurations.testRuntimeClasspath {
// spock-core depends on assertj version that is not compatible with kafka-clients
resolutionStrategy.force 'org.assertj:assertj-core:2.9.1'
}
258 changes: 258 additions & 0 deletions dd-java-agent/instrumentation/kafka-connect-0.11/gradle.lockfile

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package datadog.trace.instrumentation.kafka_connect;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import net.bytebuddy.matcher.ElementMatchers;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.runtime.TaskStatus.Listener;

import java.util.Arrays;

import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.extendsClass;
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.*;

@AutoService(InstrumenterModule.class)
public final class ConnectWorkerInstrumentation extends InstrumenterModule.Tracing
implements Instrumenter.ForTypeHierarchy {

static final String TARGET_TYPE = "org.apache.kafka.connect.runtime.WorkerTask";

public ConnectWorkerInstrumentation() {
super("kafka", "kafka-connect");
}

@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".TaskListener",
};
}

@Override
public String hierarchyMarkerType() {
return TARGET_TYPE;
}

@Override
public ElementMatcher<TypeDescription> hierarchyMatcher() {
return extendsClass(named(hierarchyMarkerType()));
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(
isConstructor().and(takesArgument(0, named("org.apache.kafka.connect.util.ConnectorTaskId")))
.and(takesArgument(1, named("org.apache.kafka.connect.runtime.TaskStatus$Listener"))),
ConnectWorkerInstrumentation.class.getName() + "$ConstructorAdvice");
}

public static class ConstructorAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void wrap(
@Advice.Argument(value = 0, readOnly = true) ConnectorTaskId id,
@Advice.Argument(value = 1, readOnly = false) Listener statusListener
) {
statusListener = new TaskListener(statusListener);
System.out.println("building worker task!!" + id.connector());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package datadog.trace.instrumentation.kafka_connect;

import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import org.apache.kafka.connect.runtime.TaskStatus.Listener;
import org.apache.kafka.connect.util.ConnectorTaskId;

public class TaskListener implements Listener{
final private Listener delegate;
public TaskListener(Listener delegate) {
this.delegate = delegate;

}
@Override
public void onStartup(ConnectorTaskId connectorTaskId) {
System.out.println("start up" + connectorTaskId.connector() + " on thread " + Thread.currentThread().getId());
AgentTracer.get()
.getDataStreamsMonitoring()
.setThreadServiceName(Thread.currentThread().getId(), connectorTaskId.connector());
delegate.onStartup(connectorTaskId);
}

@Override
public void onPause(ConnectorTaskId connectorTaskId) {
System.out.println("pause" + connectorTaskId.connector());
delegate.onPause(connectorTaskId);
AgentTracer.get()
.getDataStreamsMonitoring()
.clearThreadServiceName(Thread.currentThread().getId());
}

@Override
public void onResume(ConnectorTaskId connectorTaskId) {
System.out.println("resume" + connectorTaskId.connector());
delegate.onResume(connectorTaskId);
AgentTracer.get()
.getDataStreamsMonitoring()
.setThreadServiceName(Thread.currentThread().getId(), connectorTaskId.connector());
}

@Override
public void onFailure(ConnectorTaskId connectorTaskId, Throwable throwable) {
System.out.println("failure" + connectorTaskId.connector());
delegate.onFailure(connectorTaskId, throwable);
AgentTracer.get()
.getDataStreamsMonitoring()
.clearThreadServiceName(Thread.currentThread().getId());
}

@Override
public void onShutdown(ConnectorTaskId connectorTaskId) {
System.out.println("shutdown" + connectorTaskId.connector());
delegate.onShutdown(connectorTaskId);
AgentTracer.get()
.getDataStreamsMonitoring()
.clearThreadServiceName(Thread.currentThread().getId());
}
}
Loading

0 comments on commit ff9bbf2

Please sign in to comment.