diff --git a/CHANGELOG.md b/CHANGELOG.md index d26e8881b57f5..b5eda7234d901 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Changed - Add instrumentation in rest and network layer. ([#9415](https://github.com/opensearch-project/OpenSearch/pull/9415)) - Allow parameterization of tests with OpenSearchIntegTestCase.SuiteScopeTestCase annotation ([#9916](https://github.com/opensearch-project/OpenSearch/pull/9916)) +- Add instrumentation in transport service. ([#10042](https://github.com/opensearch-project/OpenSearch/pull/10042)) ### Deprecated diff --git a/plugins/telemetry-otel/src/internalClusterTest/java/org/opensearch/telemetry/tracing/TelemetryTracerEnabledSanityIT.java b/plugins/telemetry-otel/src/internalClusterTest/java/org/opensearch/telemetry/tracing/TelemetryTracerEnabledSanityIT.java index 2d0111e64faad..8a49a0abf5512 100644 --- a/plugins/telemetry-otel/src/internalClusterTest/java/org/opensearch/telemetry/tracing/TelemetryTracerEnabledSanityIT.java +++ b/plugins/telemetry-otel/src/internalClusterTest/java/org/opensearch/telemetry/tracing/TelemetryTracerEnabledSanityIT.java @@ -89,7 +89,7 @@ public void testSanityChecksWhenTracingEnabled() throws Exception { InMemorySingletonSpanExporter exporter = InMemorySingletonSpanExporter.INSTANCE; if (!exporter.getFinishedSpanItems().isEmpty()) { - validators.validate(exporter.getFinishedSpanItems(), 2); + validators.validate(exporter.getFinishedSpanItems(), 6); } } diff --git a/server/src/internalClusterTest/java/org/opensearch/discovery/single/SingleNodeDiscoveryIT.java b/server/src/internalClusterTest/java/org/opensearch/discovery/single/SingleNodeDiscoveryIT.java index 90bdcf7fded11..1f6c8eac6c391 100644 --- a/server/src/internalClusterTest/java/org/opensearch/discovery/single/SingleNodeDiscoveryIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/discovery/single/SingleNodeDiscoveryIT.java @@ -76,6 +76,7 @@ public void testSingleNodesDoNotDiscoverEachOther() throws IOException, Interrup @Override public Settings nodeSettings(int nodeOrdinal) { return Settings.builder() + .put(featureFlagSettings()) .put("discovery.type", "single-node") .put("transport.type", getTestTransportType()) /* @@ -142,6 +143,7 @@ public boolean innerMatch(final LogEvent event) { @Override public Settings nodeSettings(int nodeOrdinal) { return Settings.builder() + .put(featureFlagSettings()) .put("discovery.type", "zen") .put("transport.type", getTestTransportType()) .put(DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s") diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/handler/TraceableTransportResponseHandler.java b/server/src/main/java/org/opensearch/telemetry/tracing/handler/TraceableTransportResponseHandler.java new file mode 100644 index 0000000000000..abddfcc6cebc1 --- /dev/null +++ b/server/src/main/java/org/opensearch/telemetry/tracing/handler/TraceableTransportResponseHandler.java @@ -0,0 +1,107 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry.tracing.handler; + +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.transport.TransportResponse; +import org.opensearch.telemetry.tracing.Span; +import org.opensearch.telemetry.tracing.SpanScope; +import org.opensearch.telemetry.tracing.Tracer; +import org.opensearch.transport.TransportException; +import org.opensearch.transport.TransportResponseHandler; + +import java.io.IOException; +import java.util.Objects; + +/** + * Tracer wrapped {@link TransportResponseHandler} + * @param TransportResponse + */ +public class TraceableTransportResponseHandler implements TransportResponseHandler { + + private final Span span; + private final TransportResponseHandler delegate; + private final Tracer tracer; + + /** + * Constructor. + * + * @param delegate delegate + * @param span span + * @param tracer tracer + */ + private TraceableTransportResponseHandler(TransportResponseHandler delegate, Span span, Tracer tracer) { + this.delegate = Objects.requireNonNull(delegate); + this.span = Objects.requireNonNull(span); + this.tracer = Objects.requireNonNull(tracer); + } + + /** + * Factory method. + * @param delegate delegate + * @param span span + * @param tracer tracer + * @return transportResponseHandler + */ + public static TransportResponseHandler create( + TransportResponseHandler delegate, + Span span, + Tracer tracer + ) { + if (FeatureFlags.isEnabled(FeatureFlags.TELEMETRY) == true) { + return new TraceableTransportResponseHandler(delegate, span, tracer); + } else { + return delegate; + } + } + + @Override + public T read(StreamInput in) throws IOException { + return delegate.read(in); + } + + @Override + public void handleResponse(T response) { + try (SpanScope scope = tracer.withSpanInScope(span)) { + delegate.handleResponse(response); + } finally { + span.endSpan(); + } + } + + @Override + public void handleException(TransportException exp) { + try (SpanScope scope = tracer.withSpanInScope(span)) { + delegate.handleException(exp); + } finally { + span.setError(exp); + span.endSpan(); + } + } + + @Override + public String executor() { + return delegate.executor(); + } + + @Override + public String toString() { + return delegate.toString(); + } + + @Override + public void handleRejection(Exception exp) { + try (SpanScope scope = tracer.withSpanInScope(span)) { + delegate.handleRejection(exp); + } finally { + span.endSpan(); + } + } +} diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/handler/package-info.java b/server/src/main/java/org/opensearch/telemetry/tracing/handler/package-info.java new file mode 100644 index 0000000000000..ff9f8f57dc07c --- /dev/null +++ b/server/src/main/java/org/opensearch/telemetry/tracing/handler/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * This package contains classes needed for tracing requests. + */ +package org.opensearch.telemetry.tracing.handler; diff --git a/server/src/main/java/org/opensearch/transport/TransportResponseHandler.java b/server/src/main/java/org/opensearch/transport/TransportResponseHandler.java index 0b39983cc3bee..90e94e52515ce 100644 --- a/server/src/main/java/org/opensearch/transport/TransportResponseHandler.java +++ b/server/src/main/java/org/opensearch/transport/TransportResponseHandler.java @@ -52,6 +52,13 @@ public interface TransportResponseHandler extends W String executor(); + /** + * This method should be handling the rejection/failure scenarios where connection to the node is rejected or failed. + * It should be used to clear up the resources held by the {@link TransportResponseHandler}. + * @param exp exception + */ + default void handleRejection(Exception exp) {}; + default TransportResponseHandler wrap(Function converter, Writeable.Reader reader) { final TransportResponseHandler self = this; return new TransportResponseHandler() { diff --git a/server/src/main/java/org/opensearch/transport/TransportService.java b/server/src/main/java/org/opensearch/transport/TransportService.java index d6f7d21ba0117..b7684d472f30a 100644 --- a/server/src/main/java/org/opensearch/transport/TransportService.java +++ b/server/src/main/java/org/opensearch/transport/TransportService.java @@ -68,7 +68,11 @@ import org.opensearch.node.NodeClosedException; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskManager; +import org.opensearch.telemetry.tracing.Span; +import org.opensearch.telemetry.tracing.SpanBuilder; +import org.opensearch.telemetry.tracing.SpanScope; import org.opensearch.telemetry.tracing.Tracer; +import org.opensearch.telemetry.tracing.handler.TraceableTransportResponseHandler; import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; @@ -336,6 +340,7 @@ protected void doStop() { getExecutorService().execute(new AbstractRunnable() { @Override public void onRejection(Exception e) { + holderToNotify.handler().handleRejection(e); // if we get rejected during node shutdown we don't wanna bubble it up logger.debug( () -> new ParameterizedMessage( @@ -348,6 +353,7 @@ public void onRejection(Exception e) { @Override public void onFailure(Exception e) { + holderToNotify.handler().handleRejection(e); logger.warn( () -> new ParameterizedMessage( "failed to notify response handler on exception, action: {}", @@ -872,53 +878,60 @@ public final void sendRequest( final TransportRequestOptions options, final TransportResponseHandler handler ) { - try { - logger.debug("Action: " + action); - final TransportResponseHandler delegate; - if (request.getParentTask().isSet()) { - // TODO: capture the connection instead so that we can cancel child tasks on the remote connections. - final Releasable unregisterChildNode = taskManager.registerChildNode(request.getParentTask().getId(), connection.getNode()); - delegate = new TransportResponseHandler() { - @Override - public void handleResponse(T response) { - unregisterChildNode.close(); - handler.handleResponse(response); - } + final Span span = tracer.startSpan(SpanBuilder.from(action, connection)); + try (SpanScope spanScope = tracer.withSpanInScope(span)) { + TransportResponseHandler traceableTransportResponseHandler = TraceableTransportResponseHandler.create(handler, span, tracer); + try { + logger.debug("Action: " + action); + final TransportResponseHandler delegate; + if (request.getParentTask().isSet()) { + // TODO: capture the connection instead so that we can cancel child tasks on the remote connections. + final Releasable unregisterChildNode = taskManager.registerChildNode( + request.getParentTask().getId(), + connection.getNode() + ); + delegate = new TransportResponseHandler() { + @Override + public void handleResponse(T response) { + unregisterChildNode.close(); + traceableTransportResponseHandler.handleResponse(response); + } - @Override - public void handleException(TransportException exp) { - unregisterChildNode.close(); - handler.handleException(exp); - } + @Override + public void handleException(TransportException exp) { + unregisterChildNode.close(); + traceableTransportResponseHandler.handleException(exp); + } - @Override - public String executor() { - return handler.executor(); - } + @Override + public String executor() { + return traceableTransportResponseHandler.executor(); + } - @Override - public T read(StreamInput in) throws IOException { - return handler.read(in); - } + @Override + public T read(StreamInput in) throws IOException { + return traceableTransportResponseHandler.read(in); + } - @Override - public String toString() { - return getClass().getName() + "/[" + action + "]:" + handler.toString(); - } - }; - } else { - delegate = handler; - } - asyncSender.sendRequest(connection, action, request, options, delegate); - } catch (final Exception ex) { - // the caller might not handle this so we invoke the handler - final TransportException te; - if (ex instanceof TransportException) { - te = (TransportException) ex; - } else { - te = new TransportException("failure to send", ex); + @Override + public String toString() { + return getClass().getName() + "/[" + action + "]:" + handler.toString(); + } + }; + } else { + delegate = traceableTransportResponseHandler; + } + asyncSender.sendRequest(connection, action, request, options, delegate); + } catch (final Exception ex) { + // the caller might not handle this so we invoke the handler + final TransportException te; + if (ex instanceof TransportException) { + te = (TransportException) ex; + } else { + te = new TransportException("failure to send", ex); + } + traceableTransportResponseHandler.handleException(te); } - handler.handleException(te); } } @@ -1040,6 +1053,7 @@ private void sendRequestInternal( threadPool.executor(executor).execute(new AbstractRunnable() { @Override public void onRejection(Exception e) { + contextToNotify.handler().handleRejection(e); // if we get rejected during node shutdown we don't wanna bubble it up logger.debug( () -> new ParameterizedMessage( @@ -1052,6 +1066,7 @@ public void onRejection(Exception e) { @Override public void onFailure(Exception e) { + contextToNotify.handler().handleRejection(e); logger.warn( () -> new ParameterizedMessage( "failed to notify response handler on exception, action: {}", diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index 24a94fb08fb5d..3da57eb2afdf4 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -2345,9 +2345,8 @@ public static void afterClass() throws Exception { INSTANCE.printTestMessage("cleaning up after"); INSTANCE.afterInternal(true); checkStaticState(true); - StrictCheckSpanProcessor.validateTracingStateOnShutdown(); } - + StrictCheckSpanProcessor.validateTracingStateOnShutdown(); } finally { SUITE_SEED = null; currentCluster = null;