Skip to content

Commit

Permalink
Fix capturing context in log4j library instrumentation with async logger
Browse files Browse the repository at this point in the history
  • Loading branch information
laurit committed Sep 5, 2024
1 parent c8e2222 commit d98b2e9
Show file tree
Hide file tree
Showing 14 changed files with 174 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ dependencies {
}

// this is needed for the async logging test
testImplementation("com.lmax:disruptor:3.4.2")
testLibrary("com.lmax:disruptor:3.4.2")
}

tasks.withType<Test>().configureEach {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.logs.LogRecordBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.incubator.config.internal.InstrumentationConfig;
import io.opentelemetry.instrumentation.log4j.appender.v2_17.internal.ContextDataAccessor;
import io.opentelemetry.instrumentation.log4j.appender.v2_17.internal.LogEventMapper;
Expand Down Expand Up @@ -77,7 +78,15 @@ public static void capture(
threadId = currentThread.getId();
}
mapper.mapLogEvent(
builder, message, level, marker, throwable, contextData, threadName, threadId);
builder,
message,
level,
marker,
throwable,
contextData,
threadName,
threadId,
Context.current());
builder.setTimestamp(Instant.now());
builder.emit();
}
Expand All @@ -87,12 +96,12 @@ private enum ContextDataAccessorImpl implements ContextDataAccessor<Map<String,

@Override
@Nullable
public Object getValue(Map<String, String> contextData, String key) {
public String getValue(Map<String, String> contextData, String key) {
return contextData.get(key);
}

@Override
public void forEach(Map<String, String> contextData, BiConsumer<String, Object> action) {
public void forEach(Map<String, String> contextData, BiConsumer<String, String> action) {
contextData.forEach(action);
}
}
Expand Down
17 changes: 15 additions & 2 deletions instrumentation/log4j/log4j-appender-2.17/library/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,26 @@ dependencies {
library("org.apache.logging.log4j:log4j-core:2.17.0")
annotationProcessor("org.apache.logging.log4j:log4j-core:2.17.0")

implementation(project(":instrumentation:log4j:log4j-context-data:log4j-context-data-2.17:library-autoconfigure"))

testImplementation("io.opentelemetry:opentelemetry-sdk-testing")
testLibrary("com.lmax:disruptor:3.3.4")

if (findProperty("testLatestDeps") as Boolean) {
testCompileOnly("biz.aQute.bnd:biz.aQute.bnd.annotation:7.0.0")
}
}

tasks.withType<Test>().configureEach {
jvmArgs("-Dotel.instrumentation.common.experimental.controller-telemetry.enabled=true")
tasks {
withType<Test>().configureEach {
jvmArgs("-Dotel.instrumentation.common.experimental.controller-telemetry.enabled=true")
}

val testAsyncLogger by registering(Test::class) {
jvmArgs("-DLog4j2.contextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector")
}

check {
dependsOn(testAsyncLogger)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.apache.logging.log4j.message.StringMapMessage;
import org.apache.logging.log4j.message.StructuredDataMessage;
import org.apache.logging.log4j.util.ReadOnlyStringMap;
import org.apache.logging.log4j.util.SortedArrayStringMap;

class LogEventToReplay implements LogEvent {

Expand Down Expand Up @@ -59,7 +60,8 @@ class LogEventToReplay implements LogEvent {
this.instant = logEvent.getInstant();
this.thrown = logEvent.getThrown();
this.marker = logEvent.getMarker();
this.contextData = logEvent.getContextData();
// copy context data, context data map may be reused
this.contextData = new SortedArrayStringMap(logEvent.getContextData());
this.threadName = logEvent.getThreadName();
this.threadId = logEvent.getThreadId();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,14 @@
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.logs.LogRecordBuilder;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.TraceFlags;
import io.opentelemetry.api.trace.TraceState;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.log4j.appender.v2_17.internal.ContextDataAccessor;
import io.opentelemetry.instrumentation.log4j.appender.v2_17.internal.LogEventMapper;
import io.opentelemetry.instrumentation.log4j.contextdata.v2_17.internal.ContextDataKeys;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -272,6 +278,28 @@ private void emit(OpenTelemetry openTelemetry, LogEvent event) {
LogRecordBuilder builder =
openTelemetry.getLogsBridge().loggerBuilder(instrumentationName).build().logRecordBuilder();
ReadOnlyStringMap contextData = event.getContextData();
Context context = Context.current();
// when using async logger we'll be executing on a different thread than what started logging
// reconstruct the context from context data
if (context == Context.root()) {
ContextDataAccessor<ReadOnlyStringMap> contextDataAccessor = ContextDataAccessorImpl.INSTANCE;
String traceId = contextDataAccessor.getValue(contextData, ContextDataKeys.TRACE_ID_KEY);
String spanId = contextDataAccessor.getValue(contextData, ContextDataKeys.SPAN_ID_KEY);
String traceFlags =
contextDataAccessor.getValue(contextData, ContextDataKeys.TRACE_FLAGS_KEY);
if (traceId != null && spanId != null && traceFlags != null) {
context =
Context.root()
.with(
Span.wrap(
SpanContext.create(
traceId,
spanId,
TraceFlags.fromHex(traceFlags, 0),
TraceState.getDefault())));
}
}

mapper.mapLogEvent(
builder,
event.getMessage(),
Expand All @@ -280,7 +308,8 @@ private void emit(OpenTelemetry openTelemetry, LogEvent event) {
event.getThrown(),
contextData,
event.getThreadName(),
event.getThreadId());
event.getThreadId(),
context);

Instant timestamp = event.getInstant();
if (timestamp != null) {
Expand All @@ -297,12 +326,12 @@ private enum ContextDataAccessorImpl implements ContextDataAccessor<ReadOnlyStri

@Override
@Nullable
public Object getValue(ReadOnlyStringMap contextData, String key) {
public String getValue(ReadOnlyStringMap contextData, String key) {
return contextData.getValue(key);
}

@Override
public void forEach(ReadOnlyStringMap contextData, BiConsumer<String, Object> action) {
public void forEach(ReadOnlyStringMap contextData, BiConsumer<String, String> action) {
contextData.forEach(action::accept);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
public interface ContextDataAccessor<T> {

@Nullable
Object getValue(T contextData, String key);
String getValue(T contextData, String key);

void forEach(T contextData, BiConsumer<String, Object> action);
void forEach(T contextData, BiConsumer<String, String> action);
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ public void mapLogEvent(
@Nullable Throwable throwable,
T contextData,
String threadName,
long threadId) {
long threadId,
Context context) {

AttributesBuilder attributes = Attributes.builder();

Expand Down Expand Up @@ -116,8 +117,7 @@ public void mapLogEvent(
}

builder.setAllAttributes(attributes.build());

builder.setContext(Context.current());
builder.setContext(context);
}

// visible for testing
Expand Down Expand Up @@ -165,16 +165,16 @@ void captureContextDataAttributes(AttributesBuilder attributes, T contextData) {
contextData,
(key, value) -> {
if (value != null) {
attributes.put(getContextDataAttributeKey(key), value.toString());
attributes.put(getContextDataAttributeKey(key), value);
}
});
return;
}

for (String key : captureContextDataAttributes) {
Object value = contextDataAccessor.getValue(contextData, key);
String value = contextDataAccessor.getValue(contextData, key);
if (value != null) {
attributes.put(getContextDataAttributeKey(key), value.toString());
attributes.put(getContextDataAttributeKey(key), value);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.logs.Severity;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.semconv.ExceptionAttributes;
import io.opentelemetry.semconv.incubating.ThreadIncubatingAttributes;
import java.time.Instant;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -93,7 +93,12 @@ void logNoSpan() {
.hasResource(resource)
.hasInstrumentationScope(instrumentationScopeInfo)
.hasBody("log message 1")
.hasAttributes(Attributes.empty()));
.hasAttributesSatisfyingExactly(
equalTo(
ThreadIncubatingAttributes.THREAD_NAME,
Thread.currentThread().getName()),
equalTo(
ThreadIncubatingAttributes.THREAD_ID, Thread.currentThread().getId())));
}

@Test
Expand Down Expand Up @@ -123,6 +128,9 @@ void logWithExtras() {
.hasSeverity(Severity.INFO)
.hasSeverityText("INFO")
.hasAttributesSatisfyingExactly(
equalTo(
ThreadIncubatingAttributes.THREAD_NAME, Thread.currentThread().getName()),
equalTo(ThreadIncubatingAttributes.THREAD_ID, Thread.currentThread().getId()),
equalTo(
ExceptionAttributes.EXCEPTION_TYPE,
IllegalStateException.class.getName()),
Expand Down Expand Up @@ -158,7 +166,13 @@ void logContextData() {
.hasInstrumentationScope(instrumentationScopeInfo)
.hasBody("log message 1")
.hasAttributesSatisfyingExactly(
equalTo(stringKey("key1"), "val1"), equalTo(stringKey("key2"), "val2")));
equalTo(
ThreadIncubatingAttributes.THREAD_NAME,
Thread.currentThread().getName()),
equalTo(
ThreadIncubatingAttributes.THREAD_ID, Thread.currentThread().getId()),
equalTo(stringKey("key1"), "val1"),
equalTo(stringKey("key2"), "val2")));
}

@Test
Expand All @@ -177,6 +191,11 @@ void logStringMapMessage() {
.hasResource(resource)
.hasInstrumentationScope(instrumentationScopeInfo)
.hasAttributesSatisfyingExactly(
equalTo(
ThreadIncubatingAttributes.THREAD_NAME,
Thread.currentThread().getName()),
equalTo(
ThreadIncubatingAttributes.THREAD_ID, Thread.currentThread().getId()),
equalTo(stringKey("log4j.map_message.key1"), "val1"),
equalTo(stringKey("log4j.map_message.key2"), "val2")));
}
Expand All @@ -198,6 +217,11 @@ void logStringMapMessageWithSpecialAttribute() {
.hasInstrumentationScope(instrumentationScopeInfo)
.hasBody("val2")
.hasAttributesSatisfyingExactly(
equalTo(
ThreadIncubatingAttributes.THREAD_NAME,
Thread.currentThread().getName()),
equalTo(
ThreadIncubatingAttributes.THREAD_ID, Thread.currentThread().getId()),
equalTo(stringKey("log4j.map_message.key1"), "val1")));
}

Expand Down Expand Up @@ -233,6 +257,11 @@ void logStructuredDataMessage() {
.hasInstrumentationScope(instrumentationScopeInfo)
.hasBody("a message")
.hasAttributesSatisfyingExactly(
equalTo(
ThreadIncubatingAttributes.THREAD_NAME,
Thread.currentThread().getName()),
equalTo(
ThreadIncubatingAttributes.THREAD_ID, Thread.currentThread().getId()),
equalTo(stringKey("log4j.map_message.key1"), "val1"),
equalTo(stringKey("log4j.map_message.key2"), "val2")));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@

import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
import io.opentelemetry.semconv.incubating.ThreadIncubatingAttributes;
import org.apache.logging.log4j.message.StringMapMessage;
import org.apache.logging.log4j.message.StructuredDataMessage;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
Expand Down Expand Up @@ -43,8 +45,16 @@ void executeAfterLogsExecution() {
OpenTelemetryAppender.install(testing.getOpenTelemetry());
}

private static boolean isAsyncLogger() {
return logger.getClass().getName().contains("AsyncLogger");
}

@Test
void twoLogs() {
// with async logger OpenTelemetryAppender.install may be called before second log message is
// captured, so we get 2 log records instead of the expected 1
Assumptions.assumeFalse(isAsyncLogger());

logger.info("log message 1");
logger.info(
"log message 2"); // Won't be instrumented because cache size is 1 (see log4j2.xml file)
Expand All @@ -61,6 +71,10 @@ void twoLogs() {

@Test
void twoLogsStringMapMessage() {
// with async logger OpenTelemetryAppender.install may be called before second log message is
// captured, so we get 2 log records instead of the expected 1
Assumptions.assumeFalse(isAsyncLogger());

StringMapMessage message = new StringMapMessage();
message.put("key1", "val1");
message.put("key2", "val2");
Expand All @@ -81,12 +95,19 @@ void twoLogsStringMapMessage() {
.hasResource(resource)
.hasInstrumentationScope(instrumentationScopeInfo)
.hasAttributesSatisfyingExactly(
equalTo(
ThreadIncubatingAttributes.THREAD_NAME, Thread.currentThread().getName()),
equalTo(ThreadIncubatingAttributes.THREAD_ID, Thread.currentThread().getId()),
equalTo(stringKey("log4j.map_message.key1"), "val1"),
equalTo(stringKey("log4j.map_message.key2"), "val2")));
}

@Test
void twoLogsStructuredDataMessage() {
// with async logger OpenTelemetryAppender.install may be called before second log message is
// captured, so we get 2 log records instead of the expected 1
Assumptions.assumeFalse(isAsyncLogger());

StructuredDataMessage message = new StructuredDataMessage("an id", "a message", "a type");
message.put("key1", "val1");
message.put("key2", "val2");
Expand All @@ -107,6 +128,9 @@ void twoLogsStructuredDataMessage() {
.hasInstrumentationScope(instrumentationScopeInfo)
.hasBody("a message")
.hasAttributesSatisfyingExactly(
equalTo(
ThreadIncubatingAttributes.THREAD_NAME, Thread.currentThread().getName()),
equalTo(ThreadIncubatingAttributes.THREAD_ID, Thread.currentThread().getId()),
equalTo(stringKey("log4j.map_message.key1"), "val1"),
equalTo(stringKey("log4j.map_message.key2"), "val2")));
}
Expand Down
Loading

0 comments on commit d98b2e9

Please sign in to comment.