Skip to content

Commit

Permalink
Implement kafka-streams-spans-3.7.0 Instrumentation Module (#2095)
Browse files Browse the repository at this point in the history
  • Loading branch information
deleonenriqueta authored Nov 7, 2024
1 parent 9aff370 commit 4acef47
Show file tree
Hide file tree
Showing 10 changed files with 350 additions and 0 deletions.
22 changes: 22 additions & 0 deletions instrumentation/kafka-streams-spans-3.7.0/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
jar {
manifest {
attributes 'Implementation-Title': 'com.newrelic.instrumentation.kafka-streams-spans-3.7.0', 'Enabled': 'false',
'Implementation-Title-Alias': 'kafka-streams-spans'
}
}

dependencies {
implementation(project(":agent-bridge"))
implementation("org.apache.kafka:kafka-streams:3.7.0")

testImplementation("org.testcontainers:kafka:1.16.3")
}

verifyInstrumentation {
passesOnly 'org.apache.kafka:kafka-streams:[3.7.0,)'
}

site {
title 'Kafka'
type 'Messaging'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
*
* * Copyright 2024 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/
package com.nr.instrumentation.kafka.streams;

import java.util.concurrent.ConcurrentHashMap;

// A global concurrent hashmap that maps client ids to application id with a possible suffix containing a configured client id.
public class ClientIdToAppIdWithSuffixMap {
private static final ConcurrentHashMap<String, String> applicationIdMap = new ConcurrentHashMap<>();

public ClientIdToAppIdWithSuffixMap() {}

public static ConcurrentHashMap<String, String> get() {
return applicationIdMap;
}

public static String getAppIdOrDefault(String clientId, String defaultId) {
return applicationIdMap.getOrDefault(clientId, defaultId);
}

public static void put(String clientId, String applicationId) {
applicationIdMap.put(clientId, applicationId);
}

public static void remove(String clientId) {
applicationIdMap.remove(clientId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
*
* * Copyright 2024 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/
package com.nr.instrumentation.kafka.streams;

// Represents a thread-local state for a transaction
public class LoopState {
public final static ThreadLocal<LoopState> LOCAL = new ThreadLocal<>();
private int recordsPolled;
private double totalProcessed;

public LoopState() {
clear();
}

public void clear() {
recordsPolled = 0;
totalProcessed = 0;
}

public int getRecordsPolled() {
return recordsPolled;
}

public void incRecordsPolled(int recordsPolled) {
this.recordsPolled += recordsPolled;
}

public double getTotalProcessed() {
return totalProcessed;
}

public void incTotalProcessed(double totalProcessed) {
this.totalProcessed += totalProcessed;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
*
* * Copyright 2024 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/
package com.nr.instrumentation.kafka.streams;

import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.TransactionNamePriority;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.streams.StreamsConfig;

public class StreamsSpansUtil {
private StreamsSpansUtil() {}

// Returns application id and if client.id is set, is added as a suffix seperated by /
public static String getAppIdWithClientIdSuffix(StreamsConfig streamsConfig) {
String applicationId = streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG);
String clientId = streamsConfig.getString(StreamsConfig.CLIENT_ID_CONFIG);
if (clientId == null || clientId.length() <= 0) {
return applicationId;
}
return applicationId + "/" + clientId;
}

public static String getAppIdWithSuffix(String threadName) {
final String defaultAppId = "APPLICATION_ID_UNKNOWN";
String nrClientId = StreamsSpansUtil.parseClientId(threadName);
if (nrClientId == null) {
return defaultAppId;
}
// Gets the application id with a possible suffix using a global hashmap.
return ClientIdToAppIdWithSuffixMap.getAppIdOrDefault(nrClientId, defaultAppId);

}

// Parses the client id out of the thread name. Could potentially cause a silent failure.
private static String parseClientId(String threadName) {
int idx = threadName.lastIndexOf("-StreamThread-");
if (idx < 0) {
return null;
}
return threadName.substring(0, idx);
}

public static void initTransaction(String applicationIdWithSuffix) {
LoopState.LOCAL.set(new LoopState());
NewRelic.getAgent().getTransaction().setTransactionName(TransactionNamePriority.FRAMEWORK_LOW, false,
"Message", "Kafka/Streams/" + applicationIdWithSuffix);
}

// Records number of records poll to loop state
public static void recordPolledToLoopState(ConsumerRecords<?, ?> records) {
LoopState state = LoopState.LOCAL.get();
if (state != null) {
int polled = records == null ? 0 : records.count();
state.incRecordsPolled(polled);
}
}

public static void incTotalProcessedToLoopState(double processed) {
LoopState state = LoopState.LOCAL.get();
if (state != null) {
state.incTotalProcessed(processed);
}

}

public static void endTransaction() {
LoopState state = LoopState.LOCAL.get();
if (state != null && state.getRecordsPolled() == 0 && state.getTotalProcessed() == 0) {
NewRelic.getAgent().getTransaction().ignore();
}
LoopState.LOCAL.remove();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package org.apache.kafka.streams;

import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;
import com.nr.instrumentation.kafka.streams.ClientIdToAppIdWithSuffixMap;
import com.nr.instrumentation.kafka.streams.StreamsSpansUtil;
import org.apache.kafka.streams.errors.StreamsException;

@Weave(originalName = "org.apache.kafka.streams.KafkaStreams")
public class KafkaStreams_Instrumentation {
private final String clientId = Weaver.callOriginal();
private final StreamsConfig applicationConfigs = Weaver.callOriginal();

public synchronized void start() throws IllegalStateException, StreamsException {
ClientIdToAppIdWithSuffixMap.put(clientId, StreamsSpansUtil.getAppIdWithClientIdSuffix(applicationConfigs));
Weaver.callOriginal();
}

public void close() {
String nrClientId = this.clientId;
Weaver.callOriginal();
ClientIdToAppIdWithSuffixMap.remove(nrClientId);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
*
* * Copyright 2024 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/
package org.apache.kafka.streams.processor.internals;

import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.Trace;
import com.newrelic.api.agent.weaver.NewField;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.WeaveAllConstructors;
import com.newrelic.api.agent.weaver.Weaver;
import com.nr.instrumentation.kafka.streams.StreamsSpansUtil;
import org.apache.kafka.clients.consumer.ConsumerRecords;

import java.time.Duration;

@Weave(originalName = "org.apache.kafka.streams.processor.internals.StreamThread")
public abstract class StreamThread_Instrumentation extends Thread {
@NewField
private String nrApplicationIdWithSuffix;

// These methods runs once per each event loop iteration
@Trace(dispatcher = true)
void runOnceWithProcessingThreads() {
initTransactionIfNeeded();

try {
Weaver.callOriginal();
} catch (Throwable t) {
NewRelic.noticeError(t);
throw t;
} finally {
StreamsSpansUtil.endTransaction();
}
}

@Trace(dispatcher = true)
void runOnceWithoutProcessingThreads() {
initTransactionIfNeeded();

try {
Weaver.callOriginal();
} catch (Throwable t) {
NewRelic.noticeError(t);
throw t;
} finally {
StreamsSpansUtil.endTransaction();
}
}

@Trace
private ConsumerRecords<byte[], byte[]> pollRequests(final Duration pollTime) {
ConsumerRecords<byte[], byte[]> records = Weaver.callOriginal();
StreamsSpansUtil.recordPolledToLoopState(records);
return records;
}

@WeaveAllConstructors
public StreamThread_Instrumentation() {
this.nrApplicationIdWithSuffix = null;
}

private void initTransactionIfNeeded() {
if (this.nrApplicationIdWithSuffix == null) {
this.nrApplicationIdWithSuffix = StreamsSpansUtil.getAppIdWithSuffix(this.getName());
}
StreamsSpansUtil.initTransaction(this.nrApplicationIdWithSuffix);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
*
* * Copyright 2024 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/
package org.apache.kafka.streams.processor.internals;

import com.newrelic.agent.bridge.AgentBridge;
import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.Trace;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.concurrent.Future;

@Weave(originalName = "org.apache.kafka.streams.processor.internals.StreamsProducer")
public class StreamsProducer_Instrumentation {
@Trace(leaf = true, excludeFromTransactionTrace = true)
Future<RecordMetadata> send(final ProducerRecord<byte[], byte[]> record, final Callback callback) {
if (AgentBridge.getAgent().getTransaction(false) != null) {
NewRelic.getAgent().getTracedMethod().setMetricName("MessageBroker/Kafka/Topic/Produce/" + record.topic());
}
return Weaver.callOriginal();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
*
* * Copyright 2024 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/
package org.apache.kafka.streams.processor.internals;

import com.newrelic.agent.bridge.AgentBridge;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;
import com.nr.instrumentation.kafka.streams.StreamsSpansUtil;
import org.apache.kafka.common.utils.Time;

@Weave(originalName = "org.apache.kafka.streams.processor.internals.TaskManager")
public class TaskManager_Instrumentation {

int process(final int maxNumRecords, final Time time) {
int processed = Weaver.callOriginal();
if (AgentBridge.getAgent().getTransaction(false) != null) {
StreamsSpansUtil.incTotalProcessedToLoopState(processed);
}
return processed;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
*
* * Copyright 2024 New Relic Corporation. All rights reserved.
* * SPDX-License-Identifier: Apache-2.0
*
*/
package org.apache.kafka.streams.processor.internals;

import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.Trace;
import com.newrelic.api.agent.weaver.MatchType;
import com.newrelic.api.agent.weaver.Weave;
import com.newrelic.api.agent.weaver.Weaver;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;

@Weave(type = MatchType.Interface, originalName = "org.apache.kafka.streams.processor.internals.Task")
public abstract class Task_Instrumentation {

@Trace
public void addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> records) {
NewRelic.getAgent().getTransaction().getTracedMethod().setMetricName(
"MessageBroker/Kafka/Streams/Task/AddRecords/ByPartition/Topic/Named/" + partition.topic());
Weaver.callOriginal();
}
}
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ include 'instrumentation:kafka-streams-spans-2.0.0'
include 'instrumentation:kafka-streams-spans-2.1.0'
include 'instrumentation:kafka-streams-spans-2.6.0'
include 'instrumentation:kafka-streams-spans-3.2.0'
include 'instrumentation:kafka-streams-spans-3.7.0'
include 'instrumentation:lettuce-4.3'
include 'instrumentation:lettuce-5.0'
include 'instrumentation:lettuce-6.0'
Expand Down

0 comments on commit 4acef47

Please sign in to comment.