diff --git a/instrumentation/spring/spring-integration-4.1/javaagent/src/test/groovy/ComplexPropagationTest.groovy b/instrumentation/spring/spring-integration-4.1/javaagent/src/test/groovy/ComplexPropagationTest.groovy deleted file mode 100644 index c5c32311b105..000000000000 --- a/instrumentation/spring/spring-integration-4.1/javaagent/src/test/groovy/ComplexPropagationTest.groovy +++ /dev/null @@ -1,13 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import io.opentelemetry.instrumentation.test.AgentTestTrait - -class ComplexPropagationTest extends AbstractComplexPropagationTest implements AgentTestTrait { - @Override - Class additionalContextClass() { - null - } -} diff --git a/instrumentation/spring/spring-integration-4.1/javaagent/src/test/groovy/SpringCloudStreamProducerTest.groovy b/instrumentation/spring/spring-integration-4.1/javaagent/src/test/groovy/SpringCloudStreamProducerTest.groovy deleted file mode 100644 index 07bf969f66a6..000000000000 --- a/instrumentation/spring/spring-integration-4.1/javaagent/src/test/groovy/SpringCloudStreamProducerTest.groovy +++ /dev/null @@ -1,13 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import io.opentelemetry.instrumentation.test.AgentTestTrait - -class SpringCloudStreamProducerTest extends AbstractSpringCloudStreamProducerTest implements AgentTestTrait { - @Override - Class additionalContextClass() { - null - } -} diff --git a/instrumentation/spring/spring-integration-4.1/javaagent/src/test/groovy/SpringCloudStreamRabbitTest.groovy b/instrumentation/spring/spring-integration-4.1/javaagent/src/test/groovy/SpringCloudStreamRabbitTest.groovy deleted file mode 100644 index fa7447ea8b55..000000000000 --- a/instrumentation/spring/spring-integration-4.1/javaagent/src/test/groovy/SpringCloudStreamRabbitTest.groovy +++ /dev/null @@ -1,13 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import io.opentelemetry.instrumentation.test.AgentTestTrait - -class SpringCloudStreamRabbitTest extends AbstractSpringCloudStreamRabbitTest implements AgentTestTrait { - @Override - Class additionalContextClass() { - null - } -} diff --git a/instrumentation/spring/spring-integration-4.1/javaagent/src/test/groovy/SpringIntegrationAndRabbitTest.groovy b/instrumentation/spring/spring-integration-4.1/javaagent/src/test/groovy/SpringIntegrationAndRabbitTest.groovy deleted file mode 100644 index 33fc7906b3bf..000000000000 --- a/instrumentation/spring/spring-integration-4.1/javaagent/src/test/groovy/SpringIntegrationAndRabbitTest.groovy +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification -import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes -import io.opentelemetry.semconv.NetworkAttributes - -import static io.opentelemetry.api.trace.SpanKind.CLIENT -import static io.opentelemetry.api.trace.SpanKind.CONSUMER -import static io.opentelemetry.api.trace.SpanKind.PRODUCER - -class SpringIntegrationAndRabbitTest extends AgentInstrumentationSpecification implements WithRabbitProducerConsumerTrait { - def setupSpec() { - startRabbit() - } - - def cleanupSpec() { - stopRabbit() - } - - def "should cooperate with existing RabbitMQ instrumentation"() { - when: - runWithSpan("parent") { - producerContext.getBean("producer", Runnable).run() - } - - then: - assertTraces(2) { - trace(0, 7) { - span(0) { - name "parent" - attributes {} - } - span(1) { - name "producer" - childOf span(0) - attributes {} - } - span(2) { - // span created by rabbitmq instrumentation - name "exchange.declare" - childOf span(1) - kind CLIENT - attributes { - "$NetworkAttributes.NETWORK_PEER_ADDRESS" { it == "127.0.0.1" || it == "0:0:0:0:0:0:0:1" || it == null } - "$NetworkAttributes.NETWORK_PEER_PORT" Long - "$NetworkAttributes.NETWORK_TYPE" { it == "ipv4" || it == "ipv6" || it == null } - "$MessagingIncubatingAttributes.MESSAGING_SYSTEM" "rabbitmq" - } - } - span(3) { - // span created by rabbitmq instrumentation - name "testTopic publish" - childOf span(1) - kind PRODUCER - attributes { - "$NetworkAttributes.NETWORK_PEER_ADDRESS" { it == "127.0.0.1" || it == "0:0:0:0:0:0:0:1" || it == null } - "$NetworkAttributes.NETWORK_PEER_PORT" Long - "$NetworkAttributes.NETWORK_TYPE" { it == "ipv4" || it == "ipv6" || it == null } - "$MessagingIncubatingAttributes.MESSAGING_SYSTEM" "rabbitmq" - "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" "testTopic" - "$MessagingIncubatingAttributes.MESSAGING_OPERATION" "publish" - "$MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE" Long - "$MessagingIncubatingAttributes.MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY" String - } - } - // spring-cloud-stream-binder-rabbit listener puts all messages into a BlockingQueue immediately after receiving - // that's why the rabbitmq CONSUMER span will never have any child span (and propagate context, actually) - span(4) { - // span created by rabbitmq instrumentation - name ~/testTopic.anonymous.[-\w]+ process/ - childOf span(3) - kind CONSUMER - attributes { - "$NetworkAttributes.NETWORK_PEER_ADDRESS" { it == "127.0.0.1" || it == "0:0:0:0:0:0:0:1" || it == null } - "$NetworkAttributes.NETWORK_PEER_PORT" Long - "$NetworkAttributes.NETWORK_TYPE" { it == "ipv4" || it == "ipv6" || it == null } - "$MessagingIncubatingAttributes.MESSAGING_SYSTEM" "rabbitmq" - "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" "testTopic" - "$MessagingIncubatingAttributes.MESSAGING_OPERATION" "process" - "$MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE" Long - "$MessagingIncubatingAttributes.MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY" String - } - } - // spring-integration will detect that spring-rabbit has already created a consumer span and back off - span(5) { - // span created by spring-rabbit instrumentation - name "testTopic process" - childOf span(3) - kind CONSUMER - attributes { - "$MessagingIncubatingAttributes.MESSAGING_SYSTEM" "rabbitmq" - "$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" "testTopic" - "$MessagingIncubatingAttributes.MESSAGING_OPERATION" "process" - "$MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID" String - "$MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE" Long - } - } - span(6) { - name "consumer" - childOf span(5) - attributes {} - } - } - - trace(1, 1) { - span(0) { - // span created by rabbitmq instrumentation - name "basic.ack" - kind CLIENT - attributes { - "$NetworkAttributes.NETWORK_PEER_ADDRESS" { it == "127.0.0.1" || it == "0:0:0:0:0:0:0:1" || it == null } - "$NetworkAttributes.NETWORK_PEER_PORT" Long - "$NetworkAttributes.NETWORK_TYPE" { it == "ipv4" || it == "ipv6" || it == null } - "$MessagingIncubatingAttributes.MESSAGING_SYSTEM" "rabbitmq" - } - } - } - } - } -} diff --git a/instrumentation/spring/spring-integration-4.1/javaagent/src/test/groovy/SpringIntegrationTelemetryTest.groovy b/instrumentation/spring/spring-integration-4.1/javaagent/src/test/groovy/SpringIntegrationTelemetryTest.groovy deleted file mode 100644 index 9adea629a504..000000000000 --- a/instrumentation/spring/spring-integration-4.1/javaagent/src/test/groovy/SpringIntegrationTelemetryTest.groovy +++ /dev/null @@ -1,13 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import io.opentelemetry.instrumentation.test.AgentTestTrait - -class SpringIntegrationTelemetryTest extends AbstractSpringIntegrationTracingTest implements AgentTestTrait { - @Override - Class additionalContextClass() { - null - } -} diff --git a/instrumentation/spring/spring-integration-4.1/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/ComplexPropagationTest.java b/instrumentation/spring/spring-integration-4.1/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/ComplexPropagationTest.java new file mode 100644 index 000000000000..27ca45e38815 --- /dev/null +++ b/instrumentation/spring/spring-integration-4.1/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/ComplexPropagationTest.java @@ -0,0 +1,20 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.spring.integration.v4_1; + +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import org.junit.jupiter.api.extension.RegisterExtension; + +class ComplexPropagationTest extends AbstractComplexPropagationTest { + + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + ComplexPropagationTest() { + super(testing, null); + } +} diff --git a/instrumentation/spring/spring-integration-4.1/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/SpringCloudStreamProducerTest.java b/instrumentation/spring/spring-integration-4.1/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/SpringCloudStreamProducerTest.java new file mode 100644 index 000000000000..aea60a23fb7b --- /dev/null +++ b/instrumentation/spring/spring-integration-4.1/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/SpringCloudStreamProducerTest.java @@ -0,0 +1,20 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.spring.integration.v4_1; + +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import org.junit.jupiter.api.extension.RegisterExtension; + +class SpringCloudStreamProducerTest extends AbstractSpringCloudStreamProducerTest { + + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + SpringCloudStreamProducerTest() { + super(testing, null); + } +} diff --git a/instrumentation/spring/spring-integration-4.1/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/SpringCloudStreamRabbitTest.java b/instrumentation/spring/spring-integration-4.1/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/SpringCloudStreamRabbitTest.java new file mode 100644 index 000000000000..0a48f66a0a33 --- /dev/null +++ b/instrumentation/spring/spring-integration-4.1/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/SpringCloudStreamRabbitTest.java @@ -0,0 +1,20 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.spring.integration.v4_1; + +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import org.junit.jupiter.api.extension.RegisterExtension; + +class SpringCloudStreamRabbitTest extends AbstractSpringCloudStreamRabbitTest { + + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + SpringCloudStreamRabbitTest() { + super(testing, null); + } +} diff --git a/instrumentation/spring/spring-integration-4.1/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/SpringIntegrationAndRabbitTest.java b/instrumentation/spring/spring-integration-4.1/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/SpringIntegrationAndRabbitTest.java new file mode 100644 index 000000000000..cf1f7882ba4e --- /dev/null +++ b/instrumentation/spring/spring-integration-4.1/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/SpringIntegrationAndRabbitTest.java @@ -0,0 +1,155 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.spring.integration.v4_1; + +import static io.opentelemetry.instrumentation.testing.GlobalTraceUtil.runWithSpan; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static org.assertj.core.api.Assertions.assertThat; + +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.semconv.NetworkAttributes; +import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +class SpringIntegrationAndRabbitTest { + + @RegisterExtension RabbitExtension rabbit; + + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + SpringIntegrationAndRabbitTest() { + rabbit = new RabbitExtension(null); + } + + @Test + void shouldCooperateWithExistingRabbitMqInstrumentation() { + testing.waitForTraces(13); // from rabbitmq instrumentation of startup + testing.clearData(); + + runWithSpan("parent", () -> rabbit.getBean("producer", Runnable.class).run()); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasTotalAttributeCount(0), + span -> + span.hasName("producer").hasParent(trace.getSpan(0)).hasTotalAttributeCount(0), + span -> span.hasName("exchange.declare"), + span -> + span.hasName("exchange.declare") + .hasParent(trace.getSpan(1)) + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfyingExactly( + satisfies( + NetworkAttributes.NETWORK_PEER_ADDRESS, + s -> s.isIn("127.0.0.1", "0:0:0:0:0:0:0:1", null)), + satisfies( + NetworkAttributes.NETWORK_PEER_PORT, + l -> l.isInstanceOf(Long.class)), + satisfies( + NetworkAttributes.NETWORK_TYPE, s -> s.isIn("ipv4", "ipv6", null)), + equalTo(MessagingIncubatingAttributes.MESSAGING_SYSTEM, "rabbitmq")), + span -> span.hasName("queue.declare"), + span -> span.hasName("queue.bind"), + span -> + span.hasName("testTopic publish") + .hasParent(trace.getSpan(1)) + .hasKind(SpanKind.PRODUCER) + .hasAttributesSatisfyingExactly( + satisfies( + NetworkAttributes.NETWORK_PEER_ADDRESS, + s -> s.isIn("127.0.0.1", "0:0:0:0:0:0:0:1", null)), + satisfies( + NetworkAttributes.NETWORK_PEER_PORT, + l -> l.isInstanceOf(Long.class)), + satisfies( + NetworkAttributes.NETWORK_TYPE, s -> s.isIn("ipv4", "ipv6", null)), + equalTo(MessagingIncubatingAttributes.MESSAGING_SYSTEM, "rabbitmq"), + equalTo( + MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME, + "testTopic"), + equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "publish"), + satisfies( + MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE, + l -> l.isInstanceOf(Long.class)), + satisfies( + MessagingIncubatingAttributes + .MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY, + s -> s.isInstanceOf(String.class))), + // spring-cloud-stream-binder-rabbit listener puts all messages into a BlockingQueue + // immediately after receiving + // that's why the rabbitmq CONSUMER span will never have any child span (and + // propagate context, actually) + span -> + span.satisfies( + spanData -> + assertThat(spanData.getName()) + .matches("testTopic.anonymous.[-\\w]+ process")) + .hasParent(trace.getSpan(6)) + .hasKind(SpanKind.CONSUMER) + .hasAttributesSatisfyingExactly( + satisfies( + NetworkAttributes.NETWORK_PEER_ADDRESS, + s -> s.isIn("127.0.0.1", "0:0:0:0:0:0:0:1", null)), + satisfies( + NetworkAttributes.NETWORK_PEER_PORT, + l -> l.isInstanceOf(Long.class)), + satisfies( + NetworkAttributes.NETWORK_TYPE, s -> s.isIn("ipv4", "ipv6", null)), + equalTo(MessagingIncubatingAttributes.MESSAGING_SYSTEM, "rabbitmq"), + equalTo( + MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME, + "testTopic"), + equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "process"), + satisfies( + MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE, + l -> l.isInstanceOf(Long.class)), + satisfies( + MessagingIncubatingAttributes + .MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY, + s -> s.isInstanceOf(String.class))), + // spring-integration will detect that spring-rabbit has already created a consumer + // span and back off + span -> + span.hasName("testTopic process") + .hasParent(trace.getSpan(6)) + .hasKind(SpanKind.CONSUMER) + .hasAttributesSatisfyingExactly( + equalTo(MessagingIncubatingAttributes.MESSAGING_SYSTEM, "rabbitmq"), + equalTo( + MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME, + "testTopic"), + equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "process"), + satisfies( + MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID, + s -> s.isInstanceOf(String.class)), + satisfies( + MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE, + l -> l.isInstanceOf(Long.class))), + span -> + span.hasName("consumer").hasParent(trace.getSpan(8)).hasTotalAttributeCount(0)), + trace -> + trace.hasSpansSatisfyingExactly( + span -> + span.hasName("basic.ack") + .hasKind(SpanKind.CLIENT) + .hasAttributesSatisfyingExactly( + satisfies( + NetworkAttributes.NETWORK_PEER_ADDRESS, + s -> s.isIn("127.0.0.1", "0:0:0:0:0:0:0:1", null)), + satisfies( + NetworkAttributes.NETWORK_PEER_PORT, + l -> l.isInstanceOf(Long.class)), + satisfies( + NetworkAttributes.NETWORK_TYPE, s -> s.isIn("ipv4", "ipv6", null)), + equalTo(MessagingIncubatingAttributes.MESSAGING_SYSTEM, "rabbitmq")))); + } +} diff --git a/instrumentation/spring/spring-integration-4.1/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/SpringIntegrationTelemetryTest.java b/instrumentation/spring/spring-integration-4.1/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/SpringIntegrationTelemetryTest.java new file mode 100644 index 000000000000..2c4f50424ead --- /dev/null +++ b/instrumentation/spring/spring-integration-4.1/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/SpringIntegrationTelemetryTest.java @@ -0,0 +1,20 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.spring.integration.v4_1; + +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import org.junit.jupiter.api.extension.RegisterExtension; + +class SpringIntegrationTelemetryTest extends AbstractSpringIntegrationTracingTest { + + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + SpringIntegrationTelemetryTest() { + super(testing, null); + } +} diff --git a/instrumentation/spring/spring-integration-4.1/library/src/test/groovy/ComplexPropagationTest.groovy b/instrumentation/spring/spring-integration-4.1/library/src/test/groovy/ComplexPropagationTest.groovy deleted file mode 100644 index ff40c296cf51..000000000000 --- a/instrumentation/spring/spring-integration-4.1/library/src/test/groovy/ComplexPropagationTest.groovy +++ /dev/null @@ -1,13 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import io.opentelemetry.instrumentation.test.LibraryTestTrait - -class ComplexPropagationTest extends AbstractComplexPropagationTest implements LibraryTestTrait { - @Override - Class additionalContextClass() { - GlobalInterceptorSpringConfig - } -} diff --git a/instrumentation/spring/spring-integration-4.1/library/src/test/groovy/GlobalInterceptorSpringConfig.groovy b/instrumentation/spring/spring-integration-4.1/library/src/test/groovy/GlobalInterceptorSpringConfig.groovy deleted file mode 100644 index a73da27e7bc5..000000000000 --- a/instrumentation/spring/spring-integration-4.1/library/src/test/groovy/GlobalInterceptorSpringConfig.groovy +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import io.opentelemetry.api.GlobalOpenTelemetry -import io.opentelemetry.instrumentation.spring.integration.v4_1.SpringIntegrationTelemetry -import org.springframework.context.annotation.Bean -import org.springframework.context.annotation.Configuration -import org.springframework.integration.config.GlobalChannelInterceptor -import org.springframework.messaging.support.ChannelInterceptor - -import static java.util.Collections.singletonList - -@Configuration -class GlobalInterceptorSpringConfig { - - @GlobalChannelInterceptor - @Bean - ChannelInterceptor otelInterceptor() { - SpringIntegrationTelemetry.builder(GlobalOpenTelemetry.get()) - .setCapturedHeaders(singletonList("test-message-header")) - .build() - .newChannelInterceptor() - } -} diff --git a/instrumentation/spring/spring-integration-4.1/library/src/test/groovy/GlobalInterceptorWithProducerSpanSpringConfig.groovy b/instrumentation/spring/spring-integration-4.1/library/src/test/groovy/GlobalInterceptorWithProducerSpanSpringConfig.groovy deleted file mode 100644 index 92a18d24804e..000000000000 --- a/instrumentation/spring/spring-integration-4.1/library/src/test/groovy/GlobalInterceptorWithProducerSpanSpringConfig.groovy +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import io.opentelemetry.api.GlobalOpenTelemetry -import io.opentelemetry.instrumentation.spring.integration.v4_1.SpringIntegrationTelemetry -import org.springframework.context.annotation.Bean -import org.springframework.context.annotation.Configuration -import org.springframework.integration.config.GlobalChannelInterceptor -import org.springframework.messaging.support.ChannelInterceptor - -@Configuration -class GlobalInterceptorWithProducerSpanSpringConfig { - - @GlobalChannelInterceptor - @Bean - ChannelInterceptor otelInterceptor() { - SpringIntegrationTelemetry.builder(GlobalOpenTelemetry.get()) - .setProducerSpanEnabled(true) - .build() - .newChannelInterceptor() - } -} diff --git a/instrumentation/spring/spring-integration-4.1/library/src/test/groovy/SpringCloudStreamProducerTest.groovy b/instrumentation/spring/spring-integration-4.1/library/src/test/groovy/SpringCloudStreamProducerTest.groovy deleted file mode 100644 index 33427f9196b8..000000000000 --- a/instrumentation/spring/spring-integration-4.1/library/src/test/groovy/SpringCloudStreamProducerTest.groovy +++ /dev/null @@ -1,13 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import io.opentelemetry.instrumentation.test.LibraryTestTrait - -class SpringCloudStreamProducerTest extends AbstractSpringCloudStreamProducerTest implements LibraryTestTrait { - @Override - Class additionalContextClass() { - GlobalInterceptorWithProducerSpanSpringConfig - } -} diff --git a/instrumentation/spring/spring-integration-4.1/library/src/test/groovy/SpringCloudStreamRabbitTest.groovy b/instrumentation/spring/spring-integration-4.1/library/src/test/groovy/SpringCloudStreamRabbitTest.groovy deleted file mode 100644 index e9d659f21778..000000000000 --- a/instrumentation/spring/spring-integration-4.1/library/src/test/groovy/SpringCloudStreamRabbitTest.groovy +++ /dev/null @@ -1,13 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import io.opentelemetry.instrumentation.test.LibraryTestTrait - -class SpringCloudStreamRabbitTest extends AbstractSpringCloudStreamRabbitTest implements LibraryTestTrait { - @Override - Class additionalContextClass() { - GlobalInterceptorSpringConfig - } -} diff --git a/instrumentation/spring/spring-integration-4.1/library/src/test/groovy/SpringIntegrationTelemetryTest.groovy b/instrumentation/spring/spring-integration-4.1/library/src/test/groovy/SpringIntegrationTelemetryTest.groovy deleted file mode 100644 index b81fbcf40231..000000000000 --- a/instrumentation/spring/spring-integration-4.1/library/src/test/groovy/SpringIntegrationTelemetryTest.groovy +++ /dev/null @@ -1,13 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import io.opentelemetry.instrumentation.test.LibraryTestTrait - -class SpringIntegrationTelemetryTest extends AbstractSpringIntegrationTracingTest implements LibraryTestTrait { - @Override - Class additionalContextClass() { - GlobalInterceptorSpringConfig - } -} diff --git a/instrumentation/spring/spring-integration-4.1/library/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/ComplexPropagationTest.java b/instrumentation/spring/spring-integration-4.1/library/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/ComplexPropagationTest.java new file mode 100644 index 000000000000..52bf2c4807f3 --- /dev/null +++ b/instrumentation/spring/spring-integration-4.1/library/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/ComplexPropagationTest.java @@ -0,0 +1,20 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.spring.integration.v4_1; + +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; +import org.junit.jupiter.api.extension.RegisterExtension; + +class ComplexPropagationTest extends AbstractComplexPropagationTest { + + @RegisterExtension + static final InstrumentationExtension testing = LibraryInstrumentationExtension.create(); + + public ComplexPropagationTest() { + super(testing, GlobalInterceptorSpringConfig.class); + } +} diff --git a/instrumentation/spring/spring-integration-4.1/library/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/GlobalInterceptorSpringConfig.java b/instrumentation/spring/spring-integration-4.1/library/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/GlobalInterceptorSpringConfig.java new file mode 100644 index 000000000000..49e7c8c04e22 --- /dev/null +++ b/instrumentation/spring/spring-integration-4.1/library/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/GlobalInterceptorSpringConfig.java @@ -0,0 +1,28 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.spring.integration.v4_1; + +import static java.util.Collections.singletonList; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.instrumentation.spring.integration.v4_1.SpringIntegrationTelemetry; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.config.GlobalChannelInterceptor; +import org.springframework.messaging.support.ChannelInterceptor; + +@Configuration +class GlobalInterceptorSpringConfig { + + @GlobalChannelInterceptor + @Bean + ChannelInterceptor otelInterceptor() { + return SpringIntegrationTelemetry.builder(GlobalOpenTelemetry.get()) + .setCapturedHeaders(singletonList("test-message-header")) + .build() + .newChannelInterceptor(); + } +} diff --git a/instrumentation/spring/spring-integration-4.1/library/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/GlobalInterceptorWithProducerSpanSpringConfig.java b/instrumentation/spring/spring-integration-4.1/library/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/GlobalInterceptorWithProducerSpanSpringConfig.java new file mode 100644 index 000000000000..68784dcec635 --- /dev/null +++ b/instrumentation/spring/spring-integration-4.1/library/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/GlobalInterceptorWithProducerSpanSpringConfig.java @@ -0,0 +1,26 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.spring.integration.v4_1; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.instrumentation.spring.integration.v4_1.SpringIntegrationTelemetry; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.config.GlobalChannelInterceptor; +import org.springframework.messaging.support.ChannelInterceptor; + +@Configuration +class GlobalInterceptorWithProducerSpanSpringConfig { + + @GlobalChannelInterceptor + @Bean + ChannelInterceptor otelInterceptor() { + return SpringIntegrationTelemetry.builder(GlobalOpenTelemetry.get()) + .setProducerSpanEnabled(true) + .build() + .newChannelInterceptor(); + } +} diff --git a/instrumentation/spring/spring-integration-4.1/library/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/SpringCloudStreamProducerTest.java b/instrumentation/spring/spring-integration-4.1/library/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/SpringCloudStreamProducerTest.java new file mode 100644 index 000000000000..b2f25b143045 --- /dev/null +++ b/instrumentation/spring/spring-integration-4.1/library/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/SpringCloudStreamProducerTest.java @@ -0,0 +1,19 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.spring.integration.v4_1; + +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; +import org.junit.jupiter.api.extension.RegisterExtension; + +class SpringCloudStreamProducerTest extends AbstractSpringCloudStreamProducerTest { + @RegisterExtension + static final InstrumentationExtension testing = LibraryInstrumentationExtension.create(); + + public SpringCloudStreamProducerTest() { + super(testing, GlobalInterceptorWithProducerSpanSpringConfig.class); + } +} diff --git a/instrumentation/spring/spring-integration-4.1/library/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/SpringCloudStreamRabbitTest.java b/instrumentation/spring/spring-integration-4.1/library/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/SpringCloudStreamRabbitTest.java new file mode 100644 index 000000000000..da78855adde3 --- /dev/null +++ b/instrumentation/spring/spring-integration-4.1/library/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/SpringCloudStreamRabbitTest.java @@ -0,0 +1,19 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.spring.integration.v4_1; + +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; +import org.junit.jupiter.api.extension.RegisterExtension; + +class SpringCloudStreamRabbitTest extends AbstractSpringCloudStreamRabbitTest { + @RegisterExtension + static final InstrumentationExtension testing = LibraryInstrumentationExtension.create(); + + public SpringCloudStreamRabbitTest() { + super(testing, GlobalInterceptorSpringConfig.class); + } +} diff --git a/instrumentation/spring/spring-integration-4.1/library/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/SpringIntegrationTelemetryTest.java b/instrumentation/spring/spring-integration-4.1/library/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/SpringIntegrationTelemetryTest.java new file mode 100644 index 000000000000..f8b3dda8f264 --- /dev/null +++ b/instrumentation/spring/spring-integration-4.1/library/src/test/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/SpringIntegrationTelemetryTest.java @@ -0,0 +1,19 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.spring.integration.v4_1; + +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension; +import org.junit.jupiter.api.extension.RegisterExtension; + +class SpringIntegrationTelemetryTest extends AbstractSpringIntegrationTracingTest { + @RegisterExtension + static final InstrumentationExtension testing = LibraryInstrumentationExtension.create(); + + public SpringIntegrationTelemetryTest() { + super(testing, GlobalInterceptorSpringConfig.class); + } +} diff --git a/instrumentation/spring/spring-integration-4.1/testing/src/main/groovy/AbstractComplexPropagationTest.groovy b/instrumentation/spring/spring-integration-4.1/testing/src/main/groovy/AbstractComplexPropagationTest.groovy deleted file mode 100644 index 0d1ce31d9a9f..000000000000 --- a/instrumentation/spring/spring-integration-4.1/testing/src/main/groovy/AbstractComplexPropagationTest.groovy +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import io.opentelemetry.instrumentation.test.InstrumentationSpecification -import org.springframework.boot.SpringApplication -import org.springframework.boot.SpringBootConfiguration -import org.springframework.boot.autoconfigure.EnableAutoConfiguration -import org.springframework.boot.context.event.ApplicationReadyEvent -import org.springframework.context.ConfigurableApplicationContext -import org.springframework.context.annotation.Bean -import org.springframework.context.event.EventListener -import org.springframework.integration.channel.DirectChannel -import org.springframework.integration.channel.ExecutorChannel -import org.springframework.messaging.Message -import org.springframework.messaging.SubscribableChannel -import org.springframework.messaging.support.MessageBuilder -import spock.lang.Shared - -import java.util.concurrent.BlockingQueue -import java.util.concurrent.ExecutorService -import java.util.concurrent.Executors -import java.util.concurrent.LinkedBlockingQueue -import java.util.stream.Collectors - -import static io.opentelemetry.api.trace.SpanKind.CONSUMER - -abstract class AbstractComplexPropagationTest extends InstrumentationSpecification { - - abstract Class additionalContextClass() - - @Shared - ConfigurableApplicationContext applicationContext - - def setupSpec() { - def contextClasses = [ExternalQueueConfig] - if (additionalContextClass() != null) { - contextClasses += additionalContextClass() - } - - def app = new SpringApplication(contextClasses as Class[]) - app.setDefaultProperties([ - "spring.main.web-application-type": "none" - ]) - applicationContext = app.run() - } - - def cleanupSpec() { - applicationContext?.close() - } - - def "should propagate context through a custom message queue"() { - given: - def sendChannel = applicationContext.getBean("sendChannel", SubscribableChannel) - def receiveChannel = applicationContext.getBean("receiveChannel", SubscribableChannel) - - def messageHandler = new CapturingMessageHandler() - receiveChannel.subscribe(messageHandler) - - when: - sendChannel.send(MessageBuilder.withPayload("test") - .setHeader("theAnswer", "42") - .build()) - - then: - messageHandler.join() - - assertTraces(1) { - trace(0, 3) { - // there's no span in the context, so spring-integration adds a CONSUMER one - span(0) { - name "application.sendChannel process" - kind CONSUMER - } - // message is received in a separate thread without any context, so a CONSUMER span with parent - // extracted from the incoming message is created - span(1) { - name "application.receiveChannel process" - childOf span(0) - kind CONSUMER - } - span(2) { - name "handler" - childOf span(1) - } - } - } - - cleanup: - receiveChannel.unsubscribe(messageHandler) - } - - // this setup simulates separate producer/consumer and some "external" message queue in between - @SpringBootConfiguration - @EnableAutoConfiguration - static class ExternalQueueConfig { - @Bean - SubscribableChannel sendChannel() { - new ExecutorChannel(Executors.newSingleThreadExecutor()) - } - - @Bean - SubscribableChannel receiveChannel() { - new DirectChannel() - } - - @Bean - BlockingQueue externalQueue() { - new LinkedBlockingQueue() - } - - @Bean(destroyMethod = "shutdownNow") - ExecutorService consumerThread() { - Executors.newSingleThreadExecutor() - } - - @EventListener(ApplicationReadyEvent) - void initialize() { - sendChannel().subscribe { message -> - externalQueue().offer(Payload.from(message)) - } - - consumerThread().execute({ - while (!Thread.interrupted()) { - def payload = externalQueue().take() - receiveChannel().send(payload.toMessage()) - } - }) - } - } - - static class Payload { - String body - Map headers - - static Payload from(Message message) { - def body = message.payload as String - Map headers = message.headers.entrySet().stream() - .filter({ kv -> kv.value instanceof String }) - .collect(Collectors.toMap({ it.key }, { it.value })) - new Payload(body: body, headers: headers) - } - - Message toMessage() { - MessageBuilder.withPayload(body) - .copyHeaders(headers) - .build() - } - } -} diff --git a/instrumentation/spring/spring-integration-4.1/testing/src/main/groovy/AbstractSpringCloudStreamProducerTest.groovy b/instrumentation/spring/spring-integration-4.1/testing/src/main/groovy/AbstractSpringCloudStreamProducerTest.groovy deleted file mode 100644 index 5a7f3f99995b..000000000000 --- a/instrumentation/spring/spring-integration-4.1/testing/src/main/groovy/AbstractSpringCloudStreamProducerTest.groovy +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import io.opentelemetry.instrumentation.test.InstrumentationSpecification - -import static io.opentelemetry.api.trace.SpanKind.CONSUMER -import static io.opentelemetry.api.trace.SpanKind.PRODUCER -import static org.junit.jupiter.api.Assumptions.assumeTrue - -abstract class AbstractSpringCloudStreamProducerTest extends InstrumentationSpecification implements WithRabbitProducerConsumerTrait { - private static final boolean HAS_PRODUCER_SPAN = Boolean.getBoolean("otel.instrumentation.spring-integration.producer.enabled") - - abstract Class additionalContextClass() - - def setupSpec() { - startRabbit(additionalContextClass()) - } - - def cleanupSpec() { - stopRabbit() - } - - def "has producer span"() { - assumeTrue(HAS_PRODUCER_SPAN) - - when: - producerContext.getBean("producer", Runnable).run() - - then: - assertTraces(1) { - trace(0, 4) { - span(0) { - name "producer" - } - span(1) { - name "testProducer.output publish" - childOf span(0) - kind PRODUCER - } - span(2) { - name "testConsumer.input process" - childOf span(1) - kind CONSUMER - } - span(3) { - name "consumer" - childOf span(2) - } - } - } - } -} diff --git a/instrumentation/spring/spring-integration-4.1/testing/src/main/groovy/AbstractSpringCloudStreamRabbitTest.groovy b/instrumentation/spring/spring-integration-4.1/testing/src/main/groovy/AbstractSpringCloudStreamRabbitTest.groovy deleted file mode 100644 index 577a35a65691..000000000000 --- a/instrumentation/spring/spring-integration-4.1/testing/src/main/groovy/AbstractSpringCloudStreamRabbitTest.groovy +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import io.opentelemetry.instrumentation.test.InstrumentationSpecification - -import static io.opentelemetry.api.trace.SpanKind.CONSUMER - -abstract class AbstractSpringCloudStreamRabbitTest extends InstrumentationSpecification implements WithRabbitProducerConsumerTrait { - - abstract Class additionalContextClass() - - def setupSpec() { - startRabbit(additionalContextClass()) - } - - def cleanupSpec() { - stopRabbit() - } - - def "should propagate context through RabbitMQ"() { - when: - producerContext.getBean("producer", Runnable).run() - - then: - assertTraces(1) { - trace(0, 3) { - span(0) { - name "producer" - } - span(1) { - name "testConsumer.input process" - childOf span(0) - kind CONSUMER - } - span(2) { - name "consumer" - childOf span(1) - } - } - } - } -} diff --git a/instrumentation/spring/spring-integration-4.1/testing/src/main/groovy/AbstractSpringIntegrationTracingTest.groovy b/instrumentation/spring/spring-integration-4.1/testing/src/main/groovy/AbstractSpringIntegrationTracingTest.groovy deleted file mode 100644 index 92f26657e2ae..000000000000 --- a/instrumentation/spring/spring-integration-4.1/testing/src/main/groovy/AbstractSpringIntegrationTracingTest.groovy +++ /dev/null @@ -1,284 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import io.opentelemetry.instrumentation.test.InstrumentationSpecification -import io.opentelemetry.sdk.trace.data.SpanData -import org.springframework.boot.SpringApplication -import org.springframework.boot.SpringBootConfiguration -import org.springframework.boot.autoconfigure.EnableAutoConfiguration -import org.springframework.boot.context.event.ApplicationReadyEvent -import org.springframework.context.ConfigurableApplicationContext -import org.springframework.context.annotation.Bean -import org.springframework.context.event.EventListener -import org.springframework.integration.channel.DirectChannel -import org.springframework.integration.channel.interceptor.GlobalChannelInterceptorWrapper -import org.springframework.messaging.Message -import org.springframework.messaging.SubscribableChannel -import org.springframework.messaging.support.ExecutorSubscribableChannel -import org.springframework.messaging.support.MessageBuilder -import spock.lang.Shared -import spock.lang.Unroll - -import java.util.concurrent.Executors - -import static io.opentelemetry.api.trace.SpanKind.CONSUMER - -@Unroll -abstract class AbstractSpringIntegrationTracingTest extends InstrumentationSpecification { - - abstract Class additionalContextClass() - - @Shared - ConfigurableApplicationContext applicationContext - - def setupSpec() { - def contextClasses = [MessageChannelsConfig] - if (additionalContextClass() != null) { - contextClasses += additionalContextClass() - } - - def app = new SpringApplication(contextClasses as Class[]) - app.setDefaultProperties([ - "spring.main.web-application-type": "none" - ]) - applicationContext = app.run() - } - - def cleanupSpec() { - applicationContext?.close() - } - - def "should propagate context (#channelName)"() { - given: - def channel = applicationContext.getBean(channelName, SubscribableChannel) - - def messageHandler = new CapturingMessageHandler() - channel.subscribe(messageHandler) - - when: - channel.send(MessageBuilder.withPayload("test") - .build()) - - then: - def capturedMessage = messageHandler.join() - - assertTraces(1) { - trace(0, 2) { - span(0) { - name interceptorSpanName - kind CONSUMER - } - span(1) { - name "handler" - childOf span(0) - } - - def interceptorSpan = span(0) - verifyCorrectSpanWasPropagated(capturedMessage, interceptorSpan) - } - } - - cleanup: - channel.unsubscribe(messageHandler) - - where: - channelName | interceptorSpanName - "directChannel" | "application.directChannel process" - "executorChannel" | "executorChannel process" - } - - def "should not add interceptor twice"() { - given: - def channel = applicationContext.getBean("directChannel1", SubscribableChannel) - - def messageHandler = new CapturingMessageHandler() - channel.subscribe(messageHandler) - - when: - channel.send(MessageBuilder.withPayload("test") - .build()) - - then: - def capturedMessage = messageHandler.join() - - assertTraces(1) { - trace(0, 2) { - span(0) { - // the channel name is overwritten by the last bean registration - name "application.directChannel2 process" - kind CONSUMER - } - span(1) { - name "handler" - childOf span(0) - } - - def interceptorSpan = span(0) - verifyCorrectSpanWasPropagated(capturedMessage, interceptorSpan) - } - } - - cleanup: - channel.unsubscribe(messageHandler) - } - - def "should not create a span when there is already a span in the context"() { - given: - def channel = applicationContext.getBean("directChannel", SubscribableChannel) - - def messageHandler = new CapturingMessageHandler() - channel.subscribe(messageHandler) - - when: - runWithSpan("parent") { - channel.send(MessageBuilder.withPayload("test") - .build()) - } - - then: - messageHandler.join() - - assertTraces(1) { - trace(0, 2) { - span(0) { - name "parent" - } - span(1) { - name "handler" - childOf span(0) - } - } - } - - cleanup: - channel.unsubscribe(messageHandler) - } - - def "should handle multiple message channels in a chain"() { - given: - def channel1 = applicationContext.getBean("linkedChannel1", SubscribableChannel) - def channel2 = applicationContext.getBean("linkedChannel2", SubscribableChannel) - - def messageHandler = new CapturingMessageHandler() - channel2.subscribe(messageHandler) - - when: - channel1.send(MessageBuilder.withPayload("test") - .build()) - - then: - def capturedMessage = messageHandler.join() - - assertTraces(1) { - trace(0, 2) { - span(0) { - name "application.linkedChannel1 process" - kind CONSUMER - } - span(1) { - name "handler" - childOf span(0) - } - - def lastChannelSpan = span(0) - verifyCorrectSpanWasPropagated(capturedMessage, lastChannelSpan) - } - } - - cleanup: - channel2.unsubscribe(messageHandler) - } - - def "capture message header"() { - given: - def channel = applicationContext.getBean("directChannel", SubscribableChannel) - - def messageHandler = new CapturingMessageHandler() - channel.subscribe(messageHandler) - - when: - channel.send(MessageBuilder.withPayload("test") - .setHeader("test-message-header", "test") - .build()) - - then: - def capturedMessage = messageHandler.join() - - assertTraces(1) { - trace(0, 2) { - span(0) { - name "application.directChannel process" - kind CONSUMER - } - span(1) { - name "handler" - childOf span(0) - } - - def interceptorSpan = span(0) - verifyCorrectSpanWasPropagated(capturedMessage, interceptorSpan) - } - } - - cleanup: - channel.unsubscribe(messageHandler) - } - - static void verifyCorrectSpanWasPropagated(Message capturedMessage, SpanData parentSpan) { - def propagatedSpan = capturedMessage.headers.get("traceparent") as String - assert propagatedSpan.contains(parentSpan.traceId), "wrong trace id" - assert propagatedSpan.contains(parentSpan.spanId), "wrong span id" - } - - @SpringBootConfiguration - @EnableAutoConfiguration - static class MessageChannelsConfig { - - SubscribableChannel problematicSharedChannel = new DirectChannel() - - @Bean - SubscribableChannel directChannel() { - new DirectChannel() - } - - @Bean - SubscribableChannel directChannel1() { - problematicSharedChannel - } - - @Bean - SubscribableChannel directChannel2() { - problematicSharedChannel - } - - @Bean - SubscribableChannel executorChannel(GlobalChannelInterceptorWrapper otelInterceptor) { - def channel = new ExecutorSubscribableChannel(Executors.newSingleThreadExecutor()) - if (!Boolean.getBoolean("testLatestDeps")) { - // spring does not inject the interceptor in 4.1 because ExecutorSubscribableChannel isn't ChannelInterceptorAware - // in later versions spring injects the global interceptor into InterceptableChannel (which ExecutorSubscribableChannel is) - channel.addInterceptor(otelInterceptor.channelInterceptor) - } - channel - } - - @Bean - SubscribableChannel linkedChannel1() { - new DirectChannel() - } - - @Bean - SubscribableChannel linkedChannel2() { - new DirectChannel() - } - - @EventListener(ApplicationReadyEvent) - void initialize() { - linkedChannel1().subscribe { message -> - linkedChannel2().send(message) - } - } - } -} diff --git a/instrumentation/spring/spring-integration-4.1/testing/src/main/groovy/CapturingMessageHandler.groovy b/instrumentation/spring/spring-integration-4.1/testing/src/main/groovy/CapturingMessageHandler.groovy deleted file mode 100644 index 5e5985853d38..000000000000 --- a/instrumentation/spring/spring-integration-4.1/testing/src/main/groovy/CapturingMessageHandler.groovy +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import org.springframework.messaging.Message -import org.springframework.messaging.MessageHandler -import org.springframework.messaging.MessagingException - -import java.util.concurrent.CompletableFuture - -import static io.opentelemetry.instrumentation.testing.GlobalTraceUtil.runWithSpan - -class CapturingMessageHandler implements MessageHandler { - final CompletableFuture> captured = new CompletableFuture<>() - - @Override - void handleMessage(Message message) throws MessagingException { - runWithSpan("handler") { - captured.complete(message) - } - } - - Message join() { - captured.join() - } -} diff --git a/instrumentation/spring/spring-integration-4.1/testing/src/main/groovy/WithRabbitProducerConsumerTrait.groovy b/instrumentation/spring/spring-integration-4.1/testing/src/main/groovy/WithRabbitProducerConsumerTrait.groovy deleted file mode 100644 index ef72c579d3d4..000000000000 --- a/instrumentation/spring/spring-integration-4.1/testing/src/main/groovy/WithRabbitProducerConsumerTrait.groovy +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -import org.springframework.beans.factory.annotation.Autowired -import org.springframework.boot.SpringApplication -import org.springframework.boot.SpringBootConfiguration -import org.springframework.boot.autoconfigure.EnableAutoConfiguration -import org.springframework.cloud.stream.annotation.EnableBinding -import org.springframework.cloud.stream.annotation.StreamListener -import org.springframework.cloud.stream.messaging.Sink -import org.springframework.cloud.stream.messaging.Source -import org.springframework.context.ConfigurableApplicationContext -import org.springframework.context.annotation.Bean -import org.springframework.messaging.support.MessageBuilder -import org.testcontainers.containers.GenericContainer -import org.testcontainers.containers.wait.strategy.Wait - -import java.time.Duration - -import static io.opentelemetry.instrumentation.testing.GlobalTraceUtil.runWithSpan - -trait WithRabbitProducerConsumerTrait { - - static GenericContainer rabbitMqContainer - static ConfigurableApplicationContext producerContext - static ConfigurableApplicationContext consumerContext - - def startRabbit(Class additionalContext = null) { - rabbitMqContainer = new GenericContainer('rabbitmq:latest') - .withExposedPorts(5672) - .waitingFor(Wait.forLogMessage(".*Server startup complete.*", 1)) - .withStartupTimeout(Duration.ofMinutes(2)) - rabbitMqContainer.start() - - def producerApp = new SpringApplication(getContextClasses(ProducerConfig, additionalContext)) - producerApp.setDefaultProperties([ - "spring.application.name" : "testProducer", - "spring.jmx.enabled" : false, - "spring.main.web-application-type" : "none", - "spring.rabbitmq.host" : rabbitMqContainer.host, - "spring.rabbitmq.port" : rabbitMqContainer.getMappedPort(5672), - "spring.cloud.stream.bindings.output.destination": "testTopic" - ]) - producerContext = producerApp.run() - - def consumerApp = new SpringApplication(getContextClasses(ConsumerConfig, additionalContext)) - consumerApp.setDefaultProperties([ - "spring.application.name" : "testConsumer", - "spring.jmx.enabled" : false, - "spring.main.web-application-type" : "none", - "spring.rabbitmq.host" : rabbitMqContainer.host, - "spring.rabbitmq.port" : rabbitMqContainer.getMappedPort(5672), - "spring.cloud.stream.bindings.input.destination": "testTopic" - ]) - consumerContext = consumerApp.run() - } - - private Class[] getContextClasses(Class mainContext, Class additionalContext) { - def contextClasses = [mainContext] - if (additionalContext != null) { - contextClasses += additionalContext - } - contextClasses - } - - def stopRabbit() { - rabbitMqContainer?.stop() - rabbitMqContainer = null - producerContext?.close() - producerContext = null - consumerContext?.close() - consumerContext = null - } - - @SpringBootConfiguration - @EnableAutoConfiguration - @EnableBinding(Source) - static class ProducerConfig { - @Autowired - Source source - - @Bean - Runnable producer() { - return { - runWithSpan("producer") { - source.output().send(MessageBuilder.withPayload("test").build()) - } - } - } - } - - @SpringBootConfiguration - @EnableAutoConfiguration - @EnableBinding(Sink) - static class ConsumerConfig { - @StreamListener(Sink.INPUT) - void consume(String ignored) { - runWithSpan("consumer") {} - } - } -} diff --git a/instrumentation/spring/spring-integration-4.1/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/AbstractComplexPropagationTest.java b/instrumentation/spring/spring-integration-4.1/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/AbstractComplexPropagationTest.java new file mode 100644 index 000000000000..11537efb0b42 --- /dev/null +++ b/instrumentation/spring/spring-integration-4.1/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/AbstractComplexPropagationTest.java @@ -0,0 +1,162 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.spring.integration.v4_1; + +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.stream.Collectors; +import javax.annotation.Nullable; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.SpringBootConfiguration; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.event.EventListener; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.channel.ExecutorChannel; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.Message; +import org.springframework.messaging.SubscribableChannel; + +public abstract class AbstractComplexPropagationTest { + + private final Class additionalContextClass; + protected InstrumentationExtension testing; + + ConfigurableApplicationContext applicationContext; + + public AbstractComplexPropagationTest( + InstrumentationExtension testing, @Nullable Class additionalContextClass) { + this.testing = testing; + this.additionalContextClass = additionalContextClass; + } + + @BeforeEach + void setUp() { + List> contextClasses = new ArrayList<>(); + contextClasses.add(ExternalQueueConfig.class); + if (additionalContextClass != null) { + contextClasses.add(additionalContextClass); + } + SpringApplication springApplication = + new SpringApplication(contextClasses.toArray(new Class[0])); + springApplication.setDefaultProperties( + Collections.singletonMap("spring.main.web-application-type", "none")); + applicationContext = springApplication.run(); + } + + @AfterEach + void tearDown() { + if (applicationContext != null) { + applicationContext.close(); + } + } + + @Test + void shouldPropagateContextThroughAcomplexFlow() { + SubscribableChannel sendChannel = + applicationContext.getBean("sendChannel", SubscribableChannel.class); + SubscribableChannel receiveChannel = + applicationContext.getBean("receiveChannel", SubscribableChannel.class); + + CapturingMessageHandler messageHandler = new CapturingMessageHandler(); + receiveChannel.subscribe(messageHandler); + + sendChannel.send(MessageBuilder.withPayload("test").setHeader("theAnswer", "42").build()); + + messageHandler.join(); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("application.sendChannel process").hasKind(SpanKind.CONSUMER), + span -> + span.hasName("application.receiveChannel process") + .hasParent(trace.getSpan(0)) + .hasKind(SpanKind.CONSUMER), + span -> span.hasName("handler").hasParent(trace.getSpan(1)))); + + receiveChannel.unsubscribe(messageHandler); + } + + // this setup simulates separate producer/consumer and some "external" message queue in between + @SpringBootConfiguration + @EnableAutoConfiguration + static class ExternalQueueConfig { + @Bean + SubscribableChannel sendChannel() { + return new ExecutorChannel(Executors.newSingleThreadExecutor()); + } + + @Bean + SubscribableChannel receiveChannel() { + return new DirectChannel(); + } + + @Bean + BlockingQueue externalQueue() { + return new LinkedBlockingQueue<>(); + } + + @Bean(destroyMethod = "shutdownNow") + ExecutorService consumerThread() { + return Executors.newSingleThreadExecutor(); + } + + @EventListener(ApplicationReadyEvent.class) + void initialize() { + sendChannel().subscribe(message -> externalQueue().offer(Payload.from(message))); + + consumerThread() + .execute( + () -> { + while (!Thread.interrupted()) { + try { + Payload payload = externalQueue().take(); + receiveChannel().send(payload.toMessage()); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + } + }); + } + } + + static class Payload { + String body; + Map headers; + + Payload(String body, Map headers) { + this.body = body; + this.headers = headers; + } + + static Payload from(Message message) { + String body = (String) message.getPayload(); + Map headers = + message.getHeaders().entrySet().stream() + .filter(kv -> kv.getValue() instanceof String) + .collect(Collectors.toMap(Map.Entry::getKey, kv -> (String) kv.getValue())); + return new Payload(body, headers); + } + + Message toMessage() { + return MessageBuilder.withPayload(body).copyHeaders(headers).build(); + } + } +} diff --git a/instrumentation/spring/spring-integration-4.1/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/AbstractSpringCloudStreamProducerTest.java b/instrumentation/spring/spring-integration-4.1/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/AbstractSpringCloudStreamProducerTest.java new file mode 100644 index 000000000000..e0c24f5c69b2 --- /dev/null +++ b/instrumentation/spring/spring-integration-4.1/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/AbstractSpringCloudStreamProducerTest.java @@ -0,0 +1,53 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.spring.integration.v4_1; + +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +public abstract class AbstractSpringCloudStreamProducerTest { + + @RegisterExtension RabbitExtension rabbit; + + protected final InstrumentationExtension testing; + + private static final boolean HAS_PRODUCER_SPAN = + Boolean.getBoolean("otel.instrumentation.spring-integration.producer.enabled"); + + public AbstractSpringCloudStreamProducerTest( + InstrumentationExtension testing, Class additionalContextClass) { + this.testing = testing; + rabbit = new RabbitExtension(additionalContextClass); + } + + @Test + void hasProducerSpan() { + assumeTrue(HAS_PRODUCER_SPAN); + + rabbit.getBean("producer", Runnable.class).run(); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("producer").hasKind(SpanKind.INTERNAL), + span -> + span.hasName("testProducer.output publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)), + span -> + span.hasName("testConsumer.input process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(1)), + span -> + span.hasName("consumer") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(2)))); + } +} diff --git a/instrumentation/spring/spring-integration-4.1/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/AbstractSpringCloudStreamRabbitTest.java b/instrumentation/spring/spring-integration-4.1/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/AbstractSpringCloudStreamRabbitTest.java new file mode 100644 index 000000000000..5020e8b7abb8 --- /dev/null +++ b/instrumentation/spring/spring-integration-4.1/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/AbstractSpringCloudStreamRabbitTest.java @@ -0,0 +1,42 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.spring.integration.v4_1; + +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +public abstract class AbstractSpringCloudStreamRabbitTest { + + @RegisterExtension RabbitExtension rabbit; + + protected final InstrumentationExtension testing; + + public AbstractSpringCloudStreamRabbitTest( + InstrumentationExtension testing, Class additionalContextClass) { + this.testing = testing; + rabbit = new RabbitExtension(additionalContextClass); + } + + @Test + void shouldPropagateContextThroughRabbitMq() { + rabbit.getBean("producer", Runnable.class).run(); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("producer").hasKind(SpanKind.INTERNAL), + span -> + span.hasName("testConsumer.input process") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(0)), + span -> + span.hasName("consumer") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(1)))); + } +} diff --git a/instrumentation/spring/spring-integration-4.1/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/AbstractSpringIntegrationTracingTest.java b/instrumentation/spring/spring-integration-4.1/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/AbstractSpringIntegrationTracingTest.java new file mode 100644 index 000000000000..3bc3ae5a5eec --- /dev/null +++ b/instrumentation/spring/spring-integration-4.1/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/AbstractSpringIntegrationTracingTest.java @@ -0,0 +1,257 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.spring.integration.v4_1; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.sdk.trace.data.SpanData; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Executors; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.SpringBootConfiguration; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.event.EventListener; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.channel.interceptor.GlobalChannelInterceptorWrapper; +import org.springframework.messaging.Message; +import org.springframework.messaging.SubscribableChannel; +import org.springframework.messaging.support.ExecutorSubscribableChannel; +import org.springframework.messaging.support.MessageBuilder; + +abstract class AbstractSpringIntegrationTracingTest { + + protected final InstrumentationExtension testing; + + private final Class additionalContextClass; + + ConfigurableApplicationContext applicationContext; + + public AbstractSpringIntegrationTracingTest( + InstrumentationExtension testing, Class additionalContextClass) { + this.testing = testing; + this.additionalContextClass = additionalContextClass; + } + + @BeforeEach + public void setUp() { + List> contextClasses = new ArrayList<>(); + contextClasses.add(MessageChannelsConfig.class); + if (additionalContextClass != null) { + contextClasses.add(additionalContextClass); + } + SpringApplication springApplication = + new SpringApplication(contextClasses.toArray(new Class[0])); + springApplication.setDefaultProperties( + Collections.singletonMap("spring.main.web-application-type", "none")); + applicationContext = springApplication.run(); + } + + @AfterEach + public void tearDown() { + if (applicationContext != null) { + applicationContext.close(); + } + } + + @ParameterizedTest + @CsvSource( + value = { + "directChannel,application.directChannel process", + "executorChannel,executorChannel process" + }, + delimiter = ',') + public void shouldPropagateContext(String channelName, String interceptorSpanName) { + SubscribableChannel channel = + applicationContext.getBean(channelName, SubscribableChannel.class); + + CapturingMessageHandler messageHandler = new CapturingMessageHandler(); + channel.subscribe(messageHandler); + + channel.send(MessageBuilder.withPayload("test").build()); + + Message capturedMessage = messageHandler.join(); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> { + span.hasName(interceptorSpanName).hasKind(SpanKind.CONSUMER); + verifyCorrectSpanWasPropagated(capturedMessage, trace.getSpan(0)); + }, + span -> span.hasName("handler").hasParent(trace.getSpan(0)))); + + channel.unsubscribe(messageHandler); + } + + @Test + void shouldNotAddInterceptorTwice() { + SubscribableChannel channel = + applicationContext.getBean("directChannel1", SubscribableChannel.class); + + CapturingMessageHandler messageHandler = new CapturingMessageHandler(); + channel.subscribe(messageHandler); + + channel.send(MessageBuilder.withPayload("test").build()); + + Message capturedMessage = messageHandler.join(); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> { + span.hasName("application.directChannel2 process").hasKind(SpanKind.CONSUMER); + verifyCorrectSpanWasPropagated(capturedMessage, trace.getSpan(0)); + }, + span -> span.hasName("handler").hasParent(trace.getSpan(0)))); + + channel.unsubscribe(messageHandler); + } + + @Test + void shouldNotCreateAspanWhenThereIsAlreadyAspanInTheContext() { + SubscribableChannel channel = + applicationContext.getBean("directChannel", SubscribableChannel.class); + + CapturingMessageHandler messageHandler = new CapturingMessageHandler(); + channel.subscribe(messageHandler); + + testing.runWithSpan( + "parent", + () -> { + channel.send(MessageBuilder.withPayload("test").build()); + }); + + messageHandler.join(); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent"), + span -> span.hasName("handler").hasParent(trace.getSpan(0)))); + + channel.unsubscribe(messageHandler); + } + + @Test + void shouldHandleMultipleMessageChannelsInAchain() { + SubscribableChannel channel1 = + applicationContext.getBean("linkedChannel1", SubscribableChannel.class); + SubscribableChannel channel2 = + applicationContext.getBean("linkedChannel2", SubscribableChannel.class); + + CapturingMessageHandler messageHandler = new CapturingMessageHandler(); + channel2.subscribe(messageHandler); + + channel1.send(MessageBuilder.withPayload("test").build()); + + Message capturedMessage = messageHandler.join(); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> { + span.hasName("application.linkedChannel1 process").hasKind(SpanKind.CONSUMER); + verifyCorrectSpanWasPropagated(capturedMessage, trace.getSpan(0)); + }, + span -> span.hasName("handler").hasParent(trace.getSpan(0)))); + + channel2.unsubscribe(messageHandler); + } + + @Test + void captureMessageHeader() { + SubscribableChannel channel = + applicationContext.getBean("directChannel", SubscribableChannel.class); + + CapturingMessageHandler messageHandler = new CapturingMessageHandler(); + channel.subscribe(messageHandler); + + channel.send( + MessageBuilder.withPayload("test").setHeader("test-message-header", "test").build()); + + Message capturedMessage = messageHandler.join(); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> { + span.hasName("application.directChannel process").hasKind(SpanKind.CONSUMER); + verifyCorrectSpanWasPropagated(capturedMessage, trace.getSpan(0)); + }, + span -> span.hasName("handler").hasParent(trace.getSpan(0)))); + + channel.unsubscribe(messageHandler); + } + + static void verifyCorrectSpanWasPropagated(Message capturedMessage, SpanData parentSpan) { + String propagatedSpan = (String) capturedMessage.getHeaders().get("traceparent"); + assertThat(propagatedSpan).contains(parentSpan.getTraceId()); + assertThat(propagatedSpan).contains(parentSpan.getSpanId()); + } + + @SpringBootConfiguration + @EnableAutoConfiguration + public static class MessageChannelsConfig { + + SubscribableChannel problematicSharedChannel = new DirectChannel(); + + @Bean + public SubscribableChannel directChannel() { + return new DirectChannel(); + } + + @Bean + public SubscribableChannel directChannel1() { + return problematicSharedChannel; + } + + @Bean + public SubscribableChannel directChannel2() { + return problematicSharedChannel; + } + + @Bean + public SubscribableChannel executorChannel(GlobalChannelInterceptorWrapper otelInterceptor) { + ExecutorSubscribableChannel channel = + new ExecutorSubscribableChannel(Executors.newSingleThreadExecutor()); + if (!Boolean.getBoolean("testLatestDeps")) { + // spring does not inject the interceptor in 4.1 because ExecutorSubscribableChannel isn't + // ChannelInterceptorAware + // in later versions spring injects the global interceptor into InterceptableChannel (which + // ExecutorSubscribableChannel is) + channel.addInterceptor(otelInterceptor.getChannelInterceptor()); + } + return channel; + } + + @Bean + public SubscribableChannel linkedChannel1() { + return new DirectChannel(); + } + + @Bean + public SubscribableChannel linkedChannel2() { + return new DirectChannel(); + } + + @EventListener(ApplicationReadyEvent.class) + public void initialize() { + linkedChannel1().subscribe(message -> linkedChannel2().send(message)); + } + } +} diff --git a/instrumentation/spring/spring-integration-4.1/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/CapturingMessageHandler.java b/instrumentation/spring/spring-integration-4.1/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/CapturingMessageHandler.java new file mode 100644 index 000000000000..cb059f63dead --- /dev/null +++ b/instrumentation/spring/spring-integration-4.1/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/CapturingMessageHandler.java @@ -0,0 +1,29 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.spring.integration.v4_1; + +import static io.opentelemetry.instrumentation.testing.GlobalTraceUtil.runWithSpan; + +import java.util.concurrent.CompletableFuture; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandler; + +class CapturingMessageHandler implements MessageHandler { + final CompletableFuture> captured = new CompletableFuture<>(); + + @Override + public void handleMessage(Message message) { + runWithSpan( + "handler", + () -> { + captured.complete(message); + }); + } + + Message join() { + return captured.join(); + } +} diff --git a/instrumentation/spring/spring-integration-4.1/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/RabbitExtension.java b/instrumentation/spring/spring-integration-4.1/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/RabbitExtension.java new file mode 100644 index 000000000000..b1b90ad47503 --- /dev/null +++ b/instrumentation/spring/spring-integration-4.1/testing/src/main/java/io/opentelemetry/javaagent/instrumentation/spring/integration/v4_1/RabbitExtension.java @@ -0,0 +1,135 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.spring.integration.v4_1; + +import static io.opentelemetry.instrumentation.testing.GlobalTraceUtil.runWithSpan; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.SpringBootConfiguration; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.cloud.stream.annotation.EnableBinding; +import org.springframework.cloud.stream.annotation.StreamListener; +import org.springframework.cloud.stream.messaging.Sink; +import org.springframework.cloud.stream.messaging.Source; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.messaging.support.MessageBuilder; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; + +public class RabbitExtension implements BeforeEachCallback, AfterEachCallback { + + private GenericContainer rabbitMqContainer; + protected ConfigurableApplicationContext producerContext; + private ConfigurableApplicationContext consumerContext; + + private final Class additionalContextClass; + + public RabbitExtension(Class additionalContextClass) { + this.additionalContextClass = additionalContextClass; + } + + public T getBean(String name, Class requiredType) { + return producerContext.getBean(name, requiredType); + } + + @Override + public void beforeEach(ExtensionContext context) { + rabbitMqContainer = + new GenericContainer<>("rabbitmq:latest") + .withExposedPorts(5672) + .waitingFor(Wait.forLogMessage(".*Server startup complete.*", 1)) + .withStartupTimeout(Duration.ofMinutes(2)); + rabbitMqContainer.start(); + + SpringApplication producerApp = + new SpringApplication(getContextClasses(ProducerConfig.class, additionalContextClass)); + Map producerProperties = new HashMap<>(); + producerProperties.put("spring.application.name", "testProducer"); + producerProperties.put("spring.jmx.enabled", false); + producerProperties.put("spring.main.web-application-type", "none"); + producerProperties.put("spring.rabbitmq.host", rabbitMqContainer.getHost()); + producerProperties.put("spring.rabbitmq.port", rabbitMqContainer.getMappedPort(5672)); + producerProperties.put("spring.cloud.stream.bindings.output.destination", "testTopic"); + producerApp.setDefaultProperties(producerProperties); + producerContext = producerApp.run(); + + SpringApplication consumerApp = + new SpringApplication( + getContextClasses(ProducerConfig.ConsumerConfig.class, additionalContextClass)); + Map consumerProperties = new HashMap<>(); + consumerProperties.put("spring.application.name", "testConsumer"); + consumerProperties.put("spring.jmx.enabled", false); + consumerProperties.put("spring.main.web-application-type", "none"); + consumerProperties.put("spring.rabbitmq.host", rabbitMqContainer.getHost()); + consumerProperties.put("spring.rabbitmq.port", rabbitMqContainer.getMappedPort(5672)); + consumerProperties.put("spring.cloud.stream.bindings.input.destination", "testTopic"); + consumerApp.setDefaultProperties(consumerProperties); + consumerContext = consumerApp.run(); + } + + @Override + public void afterEach(ExtensionContext context) { + if (rabbitMqContainer != null) { + rabbitMqContainer.stop(); + } + if (producerContext != null) { + producerContext.close(); + } + if (consumerContext != null) { + consumerContext.close(); + } + } + + private static Class[] getContextClasses(Class mainContext, Class additionalContext) { + List> contextClasses = new ArrayList<>(); + contextClasses.add(mainContext); + if (additionalContext != null) { + contextClasses.add(additionalContext); + } + return contextClasses.toArray(new Class[0]); + } + + @SpringBootConfiguration + @EnableAutoConfiguration + @EnableBinding(Source.class) + static class ProducerConfig { + @Autowired Source source; + + @Bean + Runnable producer() { + return () -> + runWithSpan( + "producer", + () -> { + source.output().send(MessageBuilder.withPayload("test").build()); + }); + } + + @SpringBootConfiguration + @EnableAutoConfiguration + @EnableBinding(Sink.class) + static class ConsumerConfig { + @StreamListener(Sink.INPUT) + void consume(String ignored) { + runWithSpan( + "consumer", + () -> { + // do nothing + }); + } + } + } +}