Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport] [2.x] Add instrumentation in transport service. #10072

Merged
merged 1 commit into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Changed
- Add instrumentation in rest and network layer. ([#9415](https://github.com/opensearch-project/OpenSearch/pull/9415))
- Allow parameterization of tests with OpenSearchIntegTestCase.SuiteScopeTestCase annotation ([#9916](https://github.com/opensearch-project/OpenSearch/pull/9916))
- Add instrumentation in transport service. ([#10042](https://github.com/opensearch-project/OpenSearch/pull/10042))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void testSanityChecksWhenTracingEnabled() throws Exception {

InMemorySingletonSpanExporter exporter = InMemorySingletonSpanExporter.INSTANCE;
if (!exporter.getFinishedSpanItems().isEmpty()) {
validators.validate(exporter.getFinishedSpanItems(), 2);
validators.validate(exporter.getFinishedSpanItems(), 6);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public void testSingleNodesDoNotDiscoverEachOther() throws IOException, Interrup
@Override
public Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(featureFlagSettings())
.put("discovery.type", "single-node")
.put("transport.type", getTestTransportType())
/*
Expand Down Expand Up @@ -142,6 +143,7 @@ public boolean innerMatch(final LogEvent event) {
@Override
public Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(featureFlagSettings())
.put("discovery.type", "zen")
.put("transport.type", getTestTransportType())
.put(DiscoverySettings.INITIAL_STATE_TIMEOUT_SETTING.getKey(), "0s")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.telemetry.tracing.handler;

import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.transport.TransportException;
import org.opensearch.transport.TransportResponseHandler;

import java.io.IOException;
import java.util.Objects;

/**
* Tracer wrapped {@link TransportResponseHandler}
* @param <T> TransportResponse
*/
public class TraceableTransportResponseHandler<T extends TransportResponse> implements TransportResponseHandler<T> {

private final Span span;
private final TransportResponseHandler<T> delegate;
private final Tracer tracer;

/**
* Constructor.
*
* @param delegate delegate
* @param span span
* @param tracer tracer
*/
private TraceableTransportResponseHandler(TransportResponseHandler<T> delegate, Span span, Tracer tracer) {
this.delegate = Objects.requireNonNull(delegate);
this.span = Objects.requireNonNull(span);
this.tracer = Objects.requireNonNull(tracer);
}

/**
* Factory method.
* @param delegate delegate
* @param span span
* @param tracer tracer
* @return transportResponseHandler
*/
public static <S extends TransportResponse> TransportResponseHandler<S> create(
TransportResponseHandler<S> delegate,
Span span,
Tracer tracer
) {
if (FeatureFlags.isEnabled(FeatureFlags.TELEMETRY) == true) {
return new TraceableTransportResponseHandler<S>(delegate, span, tracer);
} else {
return delegate;
}
}

@Override
public T read(StreamInput in) throws IOException {
return delegate.read(in);
}

@Override
public void handleResponse(T response) {
try (SpanScope scope = tracer.withSpanInScope(span)) {
delegate.handleResponse(response);
} finally {
span.endSpan();
}
}

@Override
public void handleException(TransportException exp) {
try (SpanScope scope = tracer.withSpanInScope(span)) {
delegate.handleException(exp);
} finally {
span.setError(exp);
span.endSpan();
}
}

@Override
public String executor() {
return delegate.executor();
}

@Override
public String toString() {
return delegate.toString();
}

@Override
public void handleRejection(Exception exp) {
try (SpanScope scope = tracer.withSpanInScope(span)) {
delegate.handleRejection(exp);
} finally {
span.endSpan();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* This package contains classes needed for tracing requests.
*/
package org.opensearch.telemetry.tracing.handler;
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ public interface TransportResponseHandler<T extends TransportResponse> extends W

String executor();

/**
* This method should be handling the rejection/failure scenarios where connection to the node is rejected or failed.
* It should be used to clear up the resources held by the {@link TransportResponseHandler}.
* @param exp exception
*/
default void handleRejection(Exception exp) {};

default <Q extends TransportResponse> TransportResponseHandler<Q> wrap(Function<Q, T> converter, Writeable.Reader<Q> reader) {
final TransportResponseHandler<T> self = this;
return new TransportResponseHandler<Q>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@
import org.opensearch.node.NodeClosedException;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskManager;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanBuilder;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.telemetry.tracing.handler.TraceableTransportResponseHandler;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -336,6 +340,7 @@ protected void doStop() {
getExecutorService().execute(new AbstractRunnable() {
@Override
public void onRejection(Exception e) {
holderToNotify.handler().handleRejection(e);
// if we get rejected during node shutdown we don't wanna bubble it up
logger.debug(
() -> new ParameterizedMessage(
Expand All @@ -348,6 +353,7 @@ public void onRejection(Exception e) {

@Override
public void onFailure(Exception e) {
holderToNotify.handler().handleRejection(e);
logger.warn(
() -> new ParameterizedMessage(
"failed to notify response handler on exception, action: {}",
Expand Down Expand Up @@ -872,53 +878,60 @@ public final <T extends TransportResponse> void sendRequest(
final TransportRequestOptions options,
final TransportResponseHandler<T> handler
) {
try {
logger.debug("Action: " + action);
final TransportResponseHandler<T> delegate;
if (request.getParentTask().isSet()) {
// TODO: capture the connection instead so that we can cancel child tasks on the remote connections.
final Releasable unregisterChildNode = taskManager.registerChildNode(request.getParentTask().getId(), connection.getNode());
delegate = new TransportResponseHandler<T>() {
@Override
public void handleResponse(T response) {
unregisterChildNode.close();
handler.handleResponse(response);
}
final Span span = tracer.startSpan(SpanBuilder.from(action, connection));
try (SpanScope spanScope = tracer.withSpanInScope(span)) {
TransportResponseHandler<T> traceableTransportResponseHandler = TraceableTransportResponseHandler.create(handler, span, tracer);
try {
logger.debug("Action: " + action);
final TransportResponseHandler<T> delegate;
if (request.getParentTask().isSet()) {
// TODO: capture the connection instead so that we can cancel child tasks on the remote connections.
final Releasable unregisterChildNode = taskManager.registerChildNode(
request.getParentTask().getId(),
connection.getNode()
);
delegate = new TransportResponseHandler<T>() {
@Override
public void handleResponse(T response) {
unregisterChildNode.close();
traceableTransportResponseHandler.handleResponse(response);
}

@Override
public void handleException(TransportException exp) {
unregisterChildNode.close();
handler.handleException(exp);
}
@Override
public void handleException(TransportException exp) {
unregisterChildNode.close();
traceableTransportResponseHandler.handleException(exp);
}

@Override
public String executor() {
return handler.executor();
}
@Override
public String executor() {
return traceableTransportResponseHandler.executor();
}

@Override
public T read(StreamInput in) throws IOException {
return handler.read(in);
}
@Override
public T read(StreamInput in) throws IOException {
return traceableTransportResponseHandler.read(in);
}

@Override
public String toString() {
return getClass().getName() + "/[" + action + "]:" + handler.toString();
}
};
} else {
delegate = handler;
}
asyncSender.sendRequest(connection, action, request, options, delegate);
} catch (final Exception ex) {
// the caller might not handle this so we invoke the handler
final TransportException te;
if (ex instanceof TransportException) {
te = (TransportException) ex;
} else {
te = new TransportException("failure to send", ex);
@Override
public String toString() {
return getClass().getName() + "/[" + action + "]:" + handler.toString();
}
};
} else {
delegate = traceableTransportResponseHandler;
}
asyncSender.sendRequest(connection, action, request, options, delegate);
} catch (final Exception ex) {
// the caller might not handle this so we invoke the handler
final TransportException te;
if (ex instanceof TransportException) {
te = (TransportException) ex;
} else {
te = new TransportException("failure to send", ex);
}
traceableTransportResponseHandler.handleException(te);
}
handler.handleException(te);
}
}

Expand Down Expand Up @@ -1040,6 +1053,7 @@ private <T extends TransportResponse> void sendRequestInternal(
threadPool.executor(executor).execute(new AbstractRunnable() {
@Override
public void onRejection(Exception e) {
contextToNotify.handler().handleRejection(e);
// if we get rejected during node shutdown we don't wanna bubble it up
logger.debug(
() -> new ParameterizedMessage(
Expand All @@ -1052,6 +1066,7 @@ public void onRejection(Exception e) {

@Override
public void onFailure(Exception e) {
contextToNotify.handler().handleRejection(e);
logger.warn(
() -> new ParameterizedMessage(
"failed to notify response handler on exception, action: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2345,9 +2345,8 @@ public static void afterClass() throws Exception {
INSTANCE.printTestMessage("cleaning up after");
INSTANCE.afterInternal(true);
checkStaticState(true);
StrictCheckSpanProcessor.validateTracingStateOnShutdown();
}

StrictCheckSpanProcessor.validateTracingStateOnShutdown();
} finally {
SUITE_SEED = null;
currentCluster = null;
Expand Down
Loading