Skip to content

Commit

Permalink
Address review comment
Browse files Browse the repository at this point in the history
Signed-off-by: Gagan Juneja <[email protected]>
  • Loading branch information
Gagan Juneja committed Oct 3, 2023
1 parent 08da91f commit eccc611
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.telemetry.tracing.channels;

import org.opensearch.Version;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.telemetry.tracing.listener.TraceableActionListener;
import org.opensearch.transport.TcpChannel;
import org.opensearch.transport.TransportChannel;

import java.io.IOException;

/**
* Tracer wrapped {@link TransportChannel}
*/
public class TraceableTransportChannel implements TransportChannel {

private final TransportChannel delegate;
private final Span span;
private final Tracer tracer;

private final TcpChannel tcpChannel;

private final static ActionListener<Void> DUMMY_ACTION_LISTENER = ActionListener.wrap(() -> {});

/**
* Constructor.
* @param delegate delegate
* @param span span
* @param tracer tracer
*/
public TraceableTransportChannel(TransportChannel delegate, Span span, Tracer tracer, TcpChannel tcpChannel) {
this.delegate = delegate;
this.span = span;
this.tracer = tracer;
this.tcpChannel = tcpChannel;
}

/**
* Factory method.
*
* @param delegate delegate
* @param span span
* @param tracer tracer
* @return transport channel
*/
public static TransportChannel create(TransportChannel delegate, final Span span, final Tracer tracer, final TcpChannel tcpChannel) {
if (FeatureFlags.isEnabled(FeatureFlags.TELEMETRY) == true) {
tcpChannel.addCloseListener(TraceableActionListener.create(DUMMY_ACTION_LISTENER, span, tracer));
return new TraceableTransportChannel(delegate, span, tracer, tcpChannel);
} else {
return delegate;
}
}

@Override
public String getProfileName() {
return delegate.getProfileName();
}

@Override
public String getChannelType() {
return delegate.getChannelType();
}

@Override
public void sendResponse(TransportResponse response) throws IOException {
try (SpanScope scope = tracer.withSpanInScope(span)) {
delegate.sendResponse(response);
} finally {
span.endSpan();
}
}

@Override
public void sendResponse(Exception exception) throws IOException {
try (SpanScope scope = tracer.withSpanInScope(span)) {
delegate.sendResponse(exception);
} finally {
span.setError(exception);
span.endSpan();
}
}

@Override
public Version getVersion() {
return delegate.getVersion();
}
}
19 changes: 10 additions & 9 deletions server/src/main/java/org/opensearch/transport/InboundHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import org.opensearch.telemetry.tracing.SpanBuilder;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.telemetry.tracing.channels.TraceableTcpChannel;
import org.opensearch.telemetry.tracing.channels.TraceableTransportChannel;
import org.opensearch.threadpool.ThreadPool;

import java.io.EOFException;
Expand Down Expand Up @@ -193,7 +193,6 @@ private <T extends TransportRequest> void handleRequest(TcpChannel channel, Head
final long requestId = header.getRequestId();
final Version version = header.getVersion();
Span span = tracer.startSpan(SpanBuilder.from(action, channel));
channel = TraceableTcpChannel.create(channel, span, tracer);
try (SpanScope spanScope = tracer.withSpanInScope(span)) {
if (header.isHandshake()) {
messageListener.onRequestReceived(requestId, action);
Expand All @@ -212,11 +211,12 @@ private <T extends TransportRequest> void handleRequest(TcpChannel channel, Head
header.isHandshake(),
message.takeBreakerReleaseControl()
);
TransportChannel traceableTransportChannel = TraceableTransportChannel.create(transportChannel, span, tracer, channel);
try {
handshaker.handleHandshake(transportChannel, requestId, stream);
handshaker.handleHandshake(traceableTransportChannel, requestId, stream);
} catch (Exception e) {
if (Version.CURRENT.isCompatible(header.getVersion())) {
sendErrorResponse(action, transportChannel, e);
sendErrorResponse(action, traceableTransportChannel, e);
} else {
logger.warn(
new ParameterizedMessage(
Expand All @@ -241,10 +241,11 @@ private <T extends TransportRequest> void handleRequest(TcpChannel channel, Head
header.isHandshake(),
message.takeBreakerReleaseControl()
);
TransportChannel traceableTransportChannel = TraceableTransportChannel.create(transportChannel, span, tracer, channel);
try {
messageListener.onRequestReceived(requestId, action);
if (message.isShortCircuit()) {
sendErrorResponse(action, transportChannel, message.getException());
sendErrorResponse(action, traceableTransportChannel, message.getException());
} else {
final StreamInput stream = namedWriteableStream(message.openOrGetStreamInput());
assertRemoteVersion(stream, header.getVersion());
Expand All @@ -258,16 +259,16 @@ private <T extends TransportRequest> void handleRequest(TcpChannel channel, Head
final String executor = reg.getExecutor();
if (ThreadPool.Names.SAME.equals(executor)) {
try {
reg.processMessageReceived(request, transportChannel);
reg.processMessageReceived(request, traceableTransportChannel);
} catch (Exception e) {
sendErrorResponse(reg.getAction(), transportChannel, e);
sendErrorResponse(reg.getAction(), traceableTransportChannel, e);
}
} else {
threadPool.executor(executor).execute(new RequestHandler<>(reg, request, transportChannel));
threadPool.executor(executor).execute(new RequestHandler<>(reg, request, traceableTransportChannel));
}
}
} catch (Exception e) {
sendErrorResponse(action, transportChannel, e);
sendErrorResponse(action, traceableTransportChannel, e);
}
}
}
Expand Down

0 comments on commit eccc611

Please sign in to comment.