From c2c5d80f72c1ab7d69dd606348a2d09a99ddd36e Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Sat, 7 Dec 2024 05:10:27 +0200 Subject: [PATCH] Add an option to disable automatic kafka interceptor configuration in spring starter (#12833) --- ...ListenerContainerFactoryPostProcessor.java | 52 ++++++++++++------- ...KafkaInstrumentationAutoConfiguration.java | 20 ++++++- ...itional-spring-configuration-metadata.json | 6 +++ 3 files changed, 57 insertions(+), 21 deletions(-) diff --git a/instrumentation/spring/spring-boot-autoconfigure/src/main/java/io/opentelemetry/instrumentation/spring/autoconfigure/internal/instrumentation/kafka/ConcurrentKafkaListenerContainerFactoryPostProcessor.java b/instrumentation/spring/spring-boot-autoconfigure/src/main/java/io/opentelemetry/instrumentation/spring/autoconfigure/internal/instrumentation/kafka/ConcurrentKafkaListenerContainerFactoryPostProcessor.java index 0842052409e7..d26c2d684526 100644 --- a/instrumentation/spring/spring-boot-autoconfigure/src/main/java/io/opentelemetry/instrumentation/spring/autoconfigure/internal/instrumentation/kafka/ConcurrentKafkaListenerContainerFactoryPostProcessor.java +++ b/instrumentation/spring/spring-boot-autoconfigure/src/main/java/io/opentelemetry/instrumentation/spring/autoconfigure/internal/instrumentation/kafka/ConcurrentKafkaListenerContainerFactoryPostProcessor.java @@ -5,43 +5,55 @@ package io.opentelemetry.instrumentation.spring.autoconfigure.internal.instrumentation.kafka; -import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.instrumentation.spring.kafka.v2_7.SpringKafkaTelemetry; -import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties; -import org.springframework.beans.factory.ObjectProvider; +import java.lang.reflect.Field; +import java.util.function.Supplier; import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.kafka.config.AbstractKafkaListenerContainerFactory; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.listener.BatchInterceptor; +import org.springframework.kafka.listener.RecordInterceptor; class ConcurrentKafkaListenerContainerFactoryPostProcessor implements BeanPostProcessor { - private final ObjectProvider openTelemetryProvider; - private final ObjectProvider configPropertiesProvider; + private final Supplier springKafkaTelemetry; ConcurrentKafkaListenerContainerFactoryPostProcessor( - ObjectProvider openTelemetryProvider, - ObjectProvider configPropertiesProvider) { - this.openTelemetryProvider = openTelemetryProvider; - this.configPropertiesProvider = configPropertiesProvider; + Supplier springKafkaTelemetry) { + this.springKafkaTelemetry = springKafkaTelemetry; } + @SuppressWarnings("unchecked") @Override public Object postProcessAfterInitialization(Object bean, String beanName) { if (!(bean instanceof ConcurrentKafkaListenerContainerFactory)) { return bean; } - ConcurrentKafkaListenerContainerFactory listenerContainerFactory = - (ConcurrentKafkaListenerContainerFactory) bean; - SpringKafkaTelemetry springKafkaTelemetry = - SpringKafkaTelemetry.builder(openTelemetryProvider.getObject()) - .setCaptureExperimentalSpanAttributes( - configPropertiesProvider - .getObject() - .getBoolean("otel.instrumentation.kafka.experimental-span-attributes", false)) - .build(); - listenerContainerFactory.setBatchInterceptor(springKafkaTelemetry.createBatchInterceptor()); - listenerContainerFactory.setRecordInterceptor(springKafkaTelemetry.createRecordInterceptor()); + ConcurrentKafkaListenerContainerFactory listenerContainerFactory = + (ConcurrentKafkaListenerContainerFactory) bean; + SpringKafkaTelemetry springKafkaTelemetry = this.springKafkaTelemetry.get(); + + // use reflection to read existing values to avoid overwriting user configured interceptors + BatchInterceptor batchInterceptor = + readField(listenerContainerFactory, "batchInterceptor", BatchInterceptor.class); + RecordInterceptor recordInterceptor = + readField(listenerContainerFactory, "recordInterceptor", RecordInterceptor.class); + listenerContainerFactory.setBatchInterceptor( + springKafkaTelemetry.createBatchInterceptor(batchInterceptor)); + listenerContainerFactory.setRecordInterceptor( + springKafkaTelemetry.createRecordInterceptor(recordInterceptor)); return listenerContainerFactory; } + + private static T readField(Object container, String filedName, Class fieldType) { + try { + Field field = AbstractKafkaListenerContainerFactory.class.getDeclaredField(filedName); + field.setAccessible(true); + return fieldType.cast(field.get(container)); + } catch (Exception exception) { + return null; + } + } } diff --git a/instrumentation/spring/spring-boot-autoconfigure/src/main/java/io/opentelemetry/instrumentation/spring/autoconfigure/internal/instrumentation/kafka/KafkaInstrumentationAutoConfiguration.java b/instrumentation/spring/spring-boot-autoconfigure/src/main/java/io/opentelemetry/instrumentation/spring/autoconfigure/internal/instrumentation/kafka/KafkaInstrumentationAutoConfiguration.java index 739e0275201c..80734c2f2c64 100644 --- a/instrumentation/spring/spring-boot-autoconfigure/src/main/java/io/opentelemetry/instrumentation/spring/autoconfigure/internal/instrumentation/kafka/KafkaInstrumentationAutoConfiguration.java +++ b/instrumentation/spring/spring-boot-autoconfigure/src/main/java/io/opentelemetry/instrumentation/spring/autoconfigure/internal/instrumentation/kafka/KafkaInstrumentationAutoConfiguration.java @@ -8,9 +8,11 @@ import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.instrumentation.kafkaclients.v2_6.KafkaTelemetry; import io.opentelemetry.instrumentation.spring.autoconfigure.internal.ConditionalOnEnabledInstrumentation; +import io.opentelemetry.instrumentation.spring.kafka.v2_7.SpringKafkaTelemetry; import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties; import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -33,13 +35,29 @@ DefaultKafkaProducerFactoryCustomizer otelKafkaProducerFactoryCustomizer( return producerFactory -> producerFactory.addPostProcessor(kafkaTelemetry::wrap); } + @Bean + static SpringKafkaTelemetry getTelemetry( + ObjectProvider openTelemetryProvider, + ObjectProvider configPropertiesProvider) { + return SpringKafkaTelemetry.builder(openTelemetryProvider.getObject()) + .setCaptureExperimentalSpanAttributes( + configPropertiesProvider + .getObject() + .getBoolean("otel.instrumentation.kafka.experimental-span-attributes", false)) + .build(); + } + // static to avoid "is not eligible for getting processed by all BeanPostProcessors" warning @Bean + @ConditionalOnProperty( + name = "otel.instrumentation.kafka.autoconfigure-interceptor", + havingValue = "true", + matchIfMissing = true) static ConcurrentKafkaListenerContainerFactoryPostProcessor otelKafkaListenerContainerFactoryBeanPostProcessor( ObjectProvider openTelemetryProvider, ObjectProvider configPropertiesProvider) { return new ConcurrentKafkaListenerContainerFactoryPostProcessor( - openTelemetryProvider, configPropertiesProvider); + () -> getTelemetry(openTelemetryProvider, configPropertiesProvider)); } } diff --git a/instrumentation/spring/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json b/instrumentation/spring/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json index 7ffeaf585b8e..a0b04d97f9fc 100644 --- a/instrumentation/spring/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json +++ b/instrumentation/spring/spring-boot-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json @@ -357,6 +357,12 @@ "description": "Enable the capture of experimental Kafka span attributes.", "defaultValue": false }, + { + "name": "otel.instrumentation.kafka.autoconfigure-interceptor", + "type": "java.lang.Boolean", + "description": "Enable automatic configuration of tracing interceptors on ConcurrentKafkaListenerContainerFactory using a BeanPostProcessor. You may disable this if you wish to manually configure these interceptors.", + "defaultValue": true + }, { "name": "otel.instrumentation.mongo.enabled", "type": "java.lang.Boolean",