diff --git a/CHANGELOG.md b/CHANGELOG.md index 29fe162a5e72f..6e94817b69946 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -90,6 +90,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Refactor Compressors from CompressorFactory to CompressorRegistry for extensibility ([#9262](https://github.com/opensearch-project/OpenSearch/pull/9262)) - Fix sort related ITs for concurrent search ([#9177](https://github.com/opensearch-project/OpenSearch/pull/9466) - [Remote Store] Implicitly use replication type SEGMENT for remote store clusters ([#9264](https://github.com/opensearch-project/OpenSearch/pull/9264)) +- Add support to use trace propagated from client ([#9506](https://github.com/opensearch-project/OpenSearch/pull/9506)) ### Deprecated diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultTracer.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultTracer.java index 2f3a425f96703..bc1a08e2d3c72 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultTracer.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/DefaultTracer.java @@ -12,6 +12,9 @@ import java.io.Closeable; import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Optional; /** * @@ -44,7 +47,7 @@ public SpanScope startSpan(String spanName) { @Override public SpanScope startSpan(String spanName, Attributes attributes) { - return startSpan(spanName, null, attributes); + return startSpan(spanName, (SpanContext) null, attributes); } @Override @@ -97,4 +100,10 @@ protected void addDefaultAttributes(Span span) { span.addAttribute(THREAD_NAME, Thread.currentThread().getName()); } + @Override + public SpanScope startSpan(String spanName, Map> headers, Attributes attributes) { + Optional propagatedSpan = tracingTelemetry.getContextPropagator().extractFromHeaders(headers); + return startSpan(spanName, propagatedSpan.map(SpanContext::new).orElse(null), attributes); + } + } diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/Tracer.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/Tracer.java index bc55b26abc761..40cc5dfd2d743 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/Tracer.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/Tracer.java @@ -9,6 +9,7 @@ package org.opensearch.telemetry.tracing; import org.opensearch.telemetry.tracing.attributes.Attributes; +import org.opensearch.telemetry.tracing.http.HttpTracer; import java.io.Closeable; @@ -18,7 +19,7 @@ * * All methods on the Tracer object are multi-thread safe. */ -public interface Tracer extends Closeable { +public interface Tracer extends HttpTracer, Closeable { /** * Starts the {@link Span} with given name diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/TracingContextPropagator.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/TracingContextPropagator.java index 3e4a377d33a3d..52e272afcd07f 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/TracingContextPropagator.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/TracingContextPropagator.java @@ -8,7 +8,9 @@ package org.opensearch.telemetry.tracing; +import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.function.BiConsumer; /** @@ -23,7 +25,15 @@ public interface TracingContextPropagator { * @param props properties * @return current span */ - Span extract(Map props); + Optional extract(Map props); + + /** + * Extracts current span from HTTP headers. + * + * @param headers request headers to extract the context from + * @return current span + */ + Optional extractFromHeaders(Map> headers); /** * Injects tracing context diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/http/HttpTracer.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/http/HttpTracer.java new file mode 100644 index 0000000000000..64ef84335a95b --- /dev/null +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/http/HttpTracer.java @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry.tracing.http; + +import org.opensearch.telemetry.tracing.Span; +import org.opensearch.telemetry.tracing.SpanScope; +import org.opensearch.telemetry.tracing.attributes.Attributes; + +import java.util.List; +import java.util.Map; + +/** + * HttpTracer helps in creating a {@link Span} which reads the incoming tracing information + * from the HttpRequest header and propagate the span accordingly. + * + * All methods on the Tracer object are multi-thread safe. + */ +public interface HttpTracer { + /** + * Start the span with propagating the tracing info from the HttpRequest header. + * + * @param spanName span name. + * @param header http request header. + * @param attributes span attributes. + * @return scope of the span, must be closed with explicit close or with try-with-resource + */ + SpanScope startSpan(String spanName, Map> header, Attributes attributes); +} diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/http/package-info.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/http/package-info.java new file mode 100644 index 0000000000000..9feb862a4e010 --- /dev/null +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/http/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Contains No-op implementations + */ +package org.opensearch.telemetry.tracing.http; diff --git a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/noop/NoopTracer.java b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/noop/NoopTracer.java index 1a37ed0d0f245..2ff50bf3bcb18 100644 --- a/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/noop/NoopTracer.java +++ b/libs/telemetry/src/main/java/org/opensearch/telemetry/tracing/noop/NoopTracer.java @@ -13,6 +13,9 @@ import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.telemetry.tracing.attributes.Attributes; +import java.util.List; +import java.util.Map; + /** * No-op implementation of Tracer * @@ -51,4 +54,9 @@ public SpanContext getCurrentSpan() { public void close() { } + + @Override + public SpanScope startSpan(String spanName, Map> header, Attributes attributes) { + return SpanScope.NO_OP; + } } diff --git a/libs/telemetry/src/test/java/org/opensearch/telemetry/tracing/DefaultTracerTests.java b/libs/telemetry/src/test/java/org/opensearch/telemetry/tracing/DefaultTracerTests.java index 07abd43c8dd7b..150992da06f89 100644 --- a/libs/telemetry/src/test/java/org/opensearch/telemetry/tracing/DefaultTracerTests.java +++ b/libs/telemetry/src/test/java/org/opensearch/telemetry/tracing/DefaultTracerTests.java @@ -17,6 +17,10 @@ import org.junit.Assert; import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; @@ -104,6 +108,28 @@ public void testCreateSpanWithParent() { Assert.assertEquals(parentSpan.getSpan(), defaultTracer.getCurrentSpan().getSpan().getParentSpan()); } + public void testHttpTracer() { + String traceId = "trace_id"; + String spanId = "span_id"; + TracingTelemetry tracingTelemetry = new MockTracingTelemetry(); + + DefaultTracer defaultTracer = new DefaultTracer( + tracingTelemetry, + new ThreadContextBasedTracerContextStorage(new ThreadContext(Settings.EMPTY), tracingTelemetry) + ); + + Map> requestHeaders = new HashMap<>(); + requestHeaders.put("traceparent", Arrays.asList(traceId + "~" + spanId)); + + SpanScope spanScope = defaultTracer.startSpan("test_span", requestHeaders, Attributes.EMPTY); + SpanContext currentSpan = defaultTracer.getCurrentSpan(); + assertNotNull(currentSpan); + assertEquals(traceId, currentSpan.getSpan().getTraceId()); + assertEquals(traceId, currentSpan.getSpan().getParentSpan().getTraceId()); + assertEquals(spanId, currentSpan.getSpan().getParentSpan().getSpanId()); + spanScope.close(); + } + public void testCreateSpanWithNullParent() { TracingTelemetry tracingTelemetry = new MockTracingTelemetry(); DefaultTracer defaultTracer = new DefaultTracer( @@ -111,7 +137,7 @@ public void testCreateSpanWithNullParent() { new ThreadContextBasedTracerContextStorage(new ThreadContext(Settings.EMPTY), tracingTelemetry) ); - defaultTracer.startSpan("span_name", null, Attributes.EMPTY); + defaultTracer.startSpan("span_name"); Assert.assertEquals("span_name", defaultTracer.getCurrentSpan().getSpan().getSpanName()); Assert.assertEquals(null, defaultTracer.getCurrentSpan().getSpan().getParentSpan()); diff --git a/libs/telemetry/src/test/java/org/opensearch/telemetry/tracing/TraceableRunnableTests.java b/libs/telemetry/src/test/java/org/opensearch/telemetry/tracing/TraceableRunnableTests.java index f1df3b24e1c9b..bcd8ffe41a17b 100644 --- a/libs/telemetry/src/test/java/org/opensearch/telemetry/tracing/TraceableRunnableTests.java +++ b/libs/telemetry/src/test/java/org/opensearch/telemetry/tracing/TraceableRunnableTests.java @@ -54,16 +54,16 @@ public void testRunnableWithParent() throws Exception { DefaultTracer defaultTracer = new DefaultTracer(new MockTracingTelemetry(), contextStorage); defaultTracer.startSpan(parentSpanName); SpanContext parentSpan = defaultTracer.getCurrentSpan(); - AtomicReference currrntSpan = new AtomicReference<>(new SpanContext(null)); + AtomicReference currentSpan = new AtomicReference<>(); final AtomicBoolean isRunnableCompleted = new AtomicBoolean(false); TraceableRunnable traceableRunnable = new TraceableRunnable(defaultTracer, spanName, parentSpan, Attributes.EMPTY, () -> { isRunnableCompleted.set(true); - currrntSpan.set(defaultTracer.getCurrentSpan()); + currentSpan.set(defaultTracer.getCurrentSpan()); }); traceableRunnable.run(); assertTrue(isRunnableCompleted.get()); - assertEquals(spanName, currrntSpan.get().getSpan().getSpanName()); - assertEquals(parentSpan.getSpan(), currrntSpan.get().getSpan().getParentSpan()); + assertEquals(spanName, currentSpan.get().getSpan().getSpanName()); + assertEquals(parentSpan.getSpan(), currentSpan.get().getSpan().getParentSpan()); assertEquals(parentSpan.getSpan(), defaultTracer.getCurrentSpan().getSpan()); } } diff --git a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagator.java b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagator.java index 739a6367ccb2e..f8fe885ee450c 100644 --- a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagator.java +++ b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagator.java @@ -8,7 +8,12 @@ package org.opensearch.telemetry.tracing; +import org.opensearch.core.common.Strings; + +import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.function.BiConsumer; import io.opentelemetry.api.OpenTelemetry; @@ -32,8 +37,12 @@ public OTelTracingContextPropagator(OpenTelemetry openTelemetry) { } @Override - public Span extract(Map props) { + public Optional extract(Map props) { Context context = openTelemetry.getPropagators().getTextMapPropagator().extract(Context.current(), props, TEXT_MAP_GETTER); + return Optional.ofNullable(getPropagatedSpan(context)); + } + + private static OTelPropagatedSpan getPropagatedSpan(Context context) { if (context != null) { io.opentelemetry.api.trace.Span span = io.opentelemetry.api.trace.Span.fromContext(context); return new OTelPropagatedSpan(span); @@ -41,6 +50,12 @@ public Span extract(Map props) { return null; } + @Override + public Optional extractFromHeaders(Map> headers) { + Context context = openTelemetry.getPropagators().getTextMapPropagator().extract(Context.current(), headers, HEADER_TEXT_MAP_GETTER); + return Optional.ofNullable(getPropagatedSpan(context)); + } + @Override public void inject(Span currentSpan, BiConsumer setter) { openTelemetry.getPropagators().getTextMapPropagator().inject(context((OTelSpan) currentSpan), setter, TEXT_MAP_SETTER); @@ -72,4 +87,23 @@ public String get(Map headers, String key) { } }; + private static final TextMapGetter>> HEADER_TEXT_MAP_GETTER = new TextMapGetter<>() { + @Override + public Iterable keys(Map> headers) { + if (headers != null) { + return headers.keySet(); + } else { + return Collections.emptySet(); + } + } + + @Override + public String get(Map> headers, String key) { + if (headers != null && headers.containsKey(key)) { + return Strings.collectionToCommaDelimitedString(headers.get(key)); + } + return null; + } + }; + } diff --git a/plugins/telemetry-otel/src/test/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagatorTests.java b/plugins/telemetry-otel/src/test/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagatorTests.java index fcf7495f331af..16a3ec9493d5d 100644 --- a/plugins/telemetry-otel/src/test/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagatorTests.java +++ b/plugins/telemetry-otel/src/test/java/org/opensearch/telemetry/tracing/OTelTracingContextPropagatorTests.java @@ -10,7 +10,9 @@ import org.opensearch.test.OpenSearchTestCase; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import io.opentelemetry.api.OpenTelemetry; @@ -19,6 +21,7 @@ import io.opentelemetry.api.trace.TraceFlags; import io.opentelemetry.api.trace.TraceState; import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; +import io.opentelemetry.context.Context; import io.opentelemetry.context.propagation.ContextPropagators; import static org.mockito.Mockito.mock; @@ -48,8 +51,39 @@ public void testExtractTracerContextFromHeader() { OpenTelemetry mockOpenTelemetry = mock(OpenTelemetry.class); when(mockOpenTelemetry.getPropagators()).thenReturn(ContextPropagators.create(W3CTraceContextPropagator.getInstance())); TracingContextPropagator tracingContextPropagator = new OTelTracingContextPropagator(mockOpenTelemetry); - org.opensearch.telemetry.tracing.Span span = tracingContextPropagator.extract(requestHeaders); + org.opensearch.telemetry.tracing.Span span = tracingContextPropagator.extract(requestHeaders).orElse(null); assertEquals(TRACE_ID, span.getTraceId()); assertEquals(SPAN_ID, span.getSpanId()); } + + public void testExtractTracerContextFromHttpHeader() { + Map> requestHeaders = new HashMap<>(); + requestHeaders.put("traceparent", Arrays.asList("00-" + TRACE_ID + "-" + SPAN_ID + "-00")); + OpenTelemetry mockOpenTelemetry = mock(OpenTelemetry.class); + when(mockOpenTelemetry.getPropagators()).thenReturn(ContextPropagators.create(W3CTraceContextPropagator.getInstance())); + TracingContextPropagator tracingContextPropagator = new OTelTracingContextPropagator(mockOpenTelemetry); + org.opensearch.telemetry.tracing.Span span = tracingContextPropagator.extractFromHeaders(requestHeaders).get(); + assertEquals(TRACE_ID, span.getTraceId()); + assertEquals(SPAN_ID, span.getSpanId()); + } + + public void testExtractTracerContextFromHttpHeaderNull() { + OpenTelemetry mockOpenTelemetry = mock(OpenTelemetry.class); + when(mockOpenTelemetry.getPropagators()).thenReturn(ContextPropagators.create(W3CTraceContextPropagator.getInstance())); + TracingContextPropagator tracingContextPropagator = new OTelTracingContextPropagator(mockOpenTelemetry); + org.opensearch.telemetry.tracing.Span span = tracingContextPropagator.extractFromHeaders(null).get(); + org.opensearch.telemetry.tracing.Span propagatedSpan = new OTelPropagatedSpan(Span.fromContext(Context.root())); + assertEquals(propagatedSpan.getTraceId(), span.getTraceId()); + assertEquals(propagatedSpan.getSpanId(), span.getSpanId()); + } + + public void testExtractTracerContextFromHttpHeaderEmpty() { + OpenTelemetry mockOpenTelemetry = mock(OpenTelemetry.class); + when(mockOpenTelemetry.getPropagators()).thenReturn(ContextPropagators.create(W3CTraceContextPropagator.getInstance())); + TracingContextPropagator tracingContextPropagator = new OTelTracingContextPropagator(mockOpenTelemetry); + org.opensearch.telemetry.tracing.Span span = tracingContextPropagator.extractFromHeaders(new HashMap<>()).get(); + org.opensearch.telemetry.tracing.Span propagatedSpan = new OTelPropagatedSpan(Span.fromContext(Context.root())); + assertEquals(propagatedSpan.getTraceId(), span.getTraceId()); + assertEquals(propagatedSpan.getSpanId(), span.getSpanId()); + } } diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java b/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java index c009ab2391aab..a32facdc71146 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/ThreadContextBasedTracerContextStorage.java @@ -90,6 +90,7 @@ private Optional spanFromThreadContext(String key) { } private Span spanFromHeader() { - return tracingTelemetry.getContextPropagator().extract(threadContext.getHeaders()); + Optional span = tracingTelemetry.getContextPropagator().extract(threadContext.getHeaders()); + return span.orElse(null); } } diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/WrappedTracer.java b/server/src/main/java/org/opensearch/telemetry/tracing/WrappedTracer.java index 466abaac435f3..b699471be7f4c 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/WrappedTracer.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/WrappedTracer.java @@ -13,6 +13,8 @@ import org.opensearch.telemetry.tracing.noop.NoopTracer; import java.io.IOException; +import java.util.List; +import java.util.Map; /** * Wrapper implementation of Tracer. This delegates call to right tracer based on the tracer settings @@ -42,7 +44,7 @@ public SpanScope startSpan(String spanName) { @Override public SpanScope startSpan(String spanName, Attributes attributes) { - return startSpan(spanName, null, attributes); + return startSpan(spanName, (SpanContext) null, attributes); } @Override @@ -66,4 +68,9 @@ public void close() throws IOException { Tracer getDelegateTracer() { return telemetrySettings.isTracingEnabled() ? defaultTracer : NoopTracer.INSTANCE; } + + @Override + public SpanScope startSpan(String spanName, Map> headers, Attributes attributes) { + return defaultTracer.startSpan(spanName, headers, attributes); + } } diff --git a/server/src/test/java/org/opensearch/telemetry/tracing/WrappedTracerTests.java b/server/src/test/java/org/opensearch/telemetry/tracing/WrappedTracerTests.java index f45381e3b4cc4..b70fe81d5f9c4 100644 --- a/server/src/test/java/org/opensearch/telemetry/tracing/WrappedTracerTests.java +++ b/server/src/test/java/org/opensearch/telemetry/tracing/WrappedTracerTests.java @@ -51,7 +51,7 @@ public void testStartSpanWithTracingEnabledInvokesDefaultTracer() throws Excepti wrappedTracer.startSpan("foo"); assertTrue(wrappedTracer.getDelegateTracer() instanceof DefaultTracer); - verify(mockDefaultTracer).startSpan(eq("foo"), eq(null), any(Attributes.class)); + verify(mockDefaultTracer).startSpan(eq("foo"), eq((SpanContext) null), any(Attributes.class)); } } @@ -64,7 +64,7 @@ public void testStartSpanWithTracingEnabledInvokesDefaultTracerWithAttr() throws wrappedTracer.startSpan("foo", attributes); assertTrue(wrappedTracer.getDelegateTracer() instanceof DefaultTracer); - verify(mockDefaultTracer).startSpan("foo", null, attributes); + verify(mockDefaultTracer).startSpan("foo", (SpanContext) null, attributes); } } diff --git a/test/telemetry/build.gradle b/test/telemetry/build.gradle index fbabe43aa5e5a..ca523a9204f4c 100644 --- a/test/telemetry/build.gradle +++ b/test/telemetry/build.gradle @@ -13,6 +13,7 @@ apply plugin: 'opensearch.build' apply plugin: 'opensearch.publish' dependencies { + api project(":libs:opensearch-core") api project(":libs:opensearch-common") api project(":libs:opensearch-telemetry") } diff --git a/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockTracingContextPropagator.java b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockTracingContextPropagator.java index dccf062df5ca5..7525b4424c243 100644 --- a/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockTracingContextPropagator.java +++ b/test/telemetry/src/main/java/org/opensearch/test/telemetry/tracing/MockTracingContextPropagator.java @@ -8,13 +8,17 @@ package org.opensearch.test.telemetry.tracing; +import org.opensearch.core.common.Strings; import org.opensearch.telemetry.tracing.Span; import org.opensearch.telemetry.tracing.TracingContextPropagator; import org.opensearch.telemetry.tracing.attributes.Attributes; +import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Optional; import java.util.function.BiConsumer; +import java.util.stream.Collectors; /** * Mock {@link TracingContextPropagator} to persist the span for internode communication. @@ -34,18 +38,31 @@ public MockTracingContextPropagator(SpanProcessor spanProcessor) { } @Override - public Span extract(Map props) { + public Optional extract(Map props) { String value = props.get(TRACE_PARENT); if (value != null) { String[] values = value.split(SEPARATOR); String traceId = values[0]; String spanId = values[1]; - return new MockSpan(null, null, traceId, spanId, spanProcessor, Attributes.EMPTY); + return Optional.of(new MockSpan(null, null, traceId, spanId, spanProcessor, Attributes.EMPTY)); } else { - return null; + return Optional.empty(); } } + @Override + public Optional extractFromHeaders(Map> headers) { + if (headers != null) { + Map convertedHeader = headers.entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> Strings.collectionToCommaDelimitedString(e.getValue()))); + return extract(convertedHeader); + } else { + return Optional.empty(); + } + + } + @Override public void inject(Span currentSpan, BiConsumer setter) { if (currentSpan instanceof MockSpan) {