Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Tracing Instrumentation] Add instrumentation in InboundHandler #10143

Merged
merged 10 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Address review comment
Signed-off-by: Gagan Juneja <[email protected]>
  • Loading branch information
Gagan Juneja committed Oct 4, 2023
commit 6b3c1322049913771d737b53228df0f01f8a54af
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.transport.BaseTcpTransportChannel;
import org.opensearch.transport.TcpChannel;
import org.opensearch.transport.TransportChannel;

Expand All @@ -23,7 +24,7 @@
/**
* Tracer wrapped {@link TransportChannel}
*/
public class TraceableTransportChannel implements TransportChannel {
public class TraceableTcpTransportChannel extends BaseTcpTransportChannel {

private final TransportChannel delegate;
private final Span span;
Expand All @@ -34,8 +35,10 @@
* @param delegate delegate
* @param span span
* @param tracer tracer
* @param channel channel
*/
public TraceableTransportChannel(TransportChannel delegate, Span span, Tracer tracer, TcpChannel tcpChannel) {
public TraceableTcpTransportChannel(TransportChannel delegate, Span span, Tracer tracer, TcpChannel channel) {
super(channel);
this.delegate = delegate;
this.span = span;
this.tracer = tracer;
Expand All @@ -47,6 +50,7 @@
* @param delegate delegate
* @param span span
* @param tracer tracer
* @param tcpChannel tcpChannel
* @return transport channel
*/
public static TransportChannel create(TransportChannel delegate, final Span span, final Tracer tracer, final TcpChannel tcpChannel) {
reta marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -65,7 +69,7 @@
}
});

return new TraceableTransportChannel(delegate, span, tracer, tcpChannel);
return new TraceableTcpTransportChannel(delegate, span, tracer, tcpChannel);
} else {
return delegate;
}
Expand All @@ -73,7 +77,7 @@

@Override
public String getProfileName() {
return delegate.getProfileName();

Check warning on line 80 in server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpTransportChannel.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpTransportChannel.java#L80

Added line #L80 was not covered by tests
}

@Override
Expand All @@ -85,9 +89,9 @@
public void sendResponse(TransportResponse response) throws IOException {
try (SpanScope scope = tracer.withSpanInScope(span)) {
delegate.sendResponse(response);
} catch (final IOException ex) {
span.setError(ex);
throw ex;

Check warning on line 94 in server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpTransportChannel.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/telemetry/tracing/channels/TraceableTcpTransportChannel.java#L92-L94

Added lines #L92 - L94 were not covered by tests
} finally {
span.endSpan();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.transport;

/**
* Base class TcpTransportChannel
*/
public abstract class BaseTcpTransportChannel implements TransportChannel {
private final TcpChannel channel;

/**
* Constructor.
* @param channel tcp channel
*/
public BaseTcpTransportChannel(TcpChannel channel) {
this.channel = channel;
}

/**
* Returns {@link TcpChannel}
* @return TcpChannel
*/
public TcpChannel getChannel() {
return channel;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import org.opensearch.telemetry.tracing.SpanBuilder;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.telemetry.tracing.channels.TraceableTransportChannel;
import org.opensearch.telemetry.tracing.channels.TraceableTcpTransportChannel;
import org.opensearch.threadpool.ThreadPool;

import java.io.EOFException;
Expand Down Expand Up @@ -200,7 +200,7 @@ private <T extends TransportRequest> void handleRequest(TcpChannel channel, Head
assert message.isShortCircuit() == false;
final StreamInput stream = namedWriteableStream(message.openOrGetStreamInput());
assertRemoteVersion(stream, header.getVersion());
final TransportChannel transportChannel = new TcpTransportChannel(
final TcpTransportChannel transportChannel = new TcpTransportChannel(
outboundHandler,
channel,
action,
Expand All @@ -211,7 +211,7 @@ private <T extends TransportRequest> void handleRequest(TcpChannel channel, Head
header.isHandshake(),
message.takeBreakerReleaseControl()
);
TransportChannel traceableTransportChannel = TraceableTransportChannel.create(transportChannel, span, tracer, channel);
TransportChannel traceableTransportChannel = TraceableTcpTransportChannel.create(transportChannel, span, tracer, channel);
try {
handshaker.handleHandshake(traceableTransportChannel, requestId, stream);
} catch (Exception e) {
Expand All @@ -230,7 +230,7 @@ private <T extends TransportRequest> void handleRequest(TcpChannel channel, Head
}
}
} else {
final TransportChannel transportChannel = new TcpTransportChannel(
final TcpTransportChannel transportChannel = new TcpTransportChannel(
outboundHandler,
channel,
action,
Expand All @@ -241,7 +241,7 @@ private <T extends TransportRequest> void handleRequest(TcpChannel channel, Head
header.isHandshake(),
message.takeBreakerReleaseControl()
);
TransportChannel traceableTransportChannel = TraceableTransportChannel.create(transportChannel, span, tracer, channel);
TransportChannel traceableTransportChannel = TraceableTcpTransportChannel.create(transportChannel, span, tracer, channel);
try {
messageListener.onRequestReceived(requestId, action);
if (message.isShortCircuit()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,14 @@ public void processMessageReceived(Request request, TransportChannel channel) th

Releasable unregisterTask = () -> taskManager.unregister(task);
try {
if (channel instanceof TcpTransportChannel && task instanceof CancellableTask) {
if (channel instanceof BaseTcpTransportChannel && task instanceof CancellableTask) {
reta marked this conversation as resolved.
Show resolved Hide resolved
if (request instanceof ShardSearchRequest) {
// on receiving request, update the inbound network time to reflect time spent in transit over the network
((ShardSearchRequest) request).setInboundNetworkTime(
Math.max(0, System.currentTimeMillis() - ((ShardSearchRequest) request).getInboundNetworkTime())
);
}
final TcpChannel tcpChannel = ((TcpTransportChannel) channel).getChannel();
final TcpChannel tcpChannel = ((BaseTcpTransportChannel) channel).getChannel();
final Releasable stopTracking = taskManager.startTrackingCancellableChannelTask(tcpChannel, (CancellableTask) task);
unregisterTask = Releasables.wrap(unregisterTask, stopTracking);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
*
* @opensearch.internal
*/
public final class TcpTransportChannel implements TransportChannel {
public final class TcpTransportChannel extends BaseTcpTransportChannel {

private final AtomicBoolean released = new AtomicBoolean();
private final OutboundHandler outboundHandler;
Expand All @@ -70,6 +70,7 @@ public final class TcpTransportChannel implements TransportChannel {
boolean isHandshake,
Releasable breakerRelease
) {
super(channel);
this.version = version;
this.features = features;
this.channel = channel;
reta marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -131,7 +132,4 @@ public Version getVersion() {
return version;
}

public TcpChannel getChannel() {
return channel;
}
}
Loading