Skip to content

Commit

Permalink
Address review comment
Browse files Browse the repository at this point in the history
Signed-off-by: Gagan Juneja <[email protected]>
  • Loading branch information
Gagan Juneja committed Oct 4, 2023
1 parent 438f66a commit 6b3c132
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.transport.BaseTcpTransportChannel;
import org.opensearch.transport.TcpChannel;
import org.opensearch.transport.TransportChannel;

Expand All @@ -23,7 +24,7 @@
/**
* Tracer wrapped {@link TransportChannel}
*/
public class TraceableTransportChannel implements TransportChannel {
public class TraceableTcpTransportChannel extends BaseTcpTransportChannel {

private final TransportChannel delegate;
private final Span span;
Expand All @@ -34,8 +35,10 @@ public class TraceableTransportChannel implements TransportChannel {
* @param delegate delegate
* @param span span
* @param tracer tracer
* @param channel channel
*/
public TraceableTransportChannel(TransportChannel delegate, Span span, Tracer tracer, TcpChannel tcpChannel) {
public TraceableTcpTransportChannel(TransportChannel delegate, Span span, Tracer tracer, TcpChannel channel) {
super(channel);
this.delegate = delegate;
this.span = span;
this.tracer = tracer;
Expand All @@ -47,6 +50,7 @@ public TraceableTransportChannel(TransportChannel delegate, Span span, Tracer tr
* @param delegate delegate
* @param span span
* @param tracer tracer
* @param tcpChannel tcpChannel
* @return transport channel
*/
public static TransportChannel create(TransportChannel delegate, final Span span, final Tracer tracer, final TcpChannel tcpChannel) {
Expand All @@ -65,7 +69,7 @@ public void onFailure(Exception e) {
}
});

return new TraceableTransportChannel(delegate, span, tracer, tcpChannel);
return new TraceableTcpTransportChannel(delegate, span, tracer, tcpChannel);
} else {
return delegate;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.transport;

/**
* Base class TcpTransportChannel
*/
public abstract class BaseTcpTransportChannel implements TransportChannel {
private final TcpChannel channel;

/**
* Constructor.
* @param channel tcp channel
*/
public BaseTcpTransportChannel(TcpChannel channel) {
this.channel = channel;
}

/**
* Returns {@link TcpChannel}
* @return TcpChannel
*/
public TcpChannel getChannel() {
return channel;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import org.opensearch.telemetry.tracing.SpanBuilder;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.telemetry.tracing.channels.TraceableTransportChannel;
import org.opensearch.telemetry.tracing.channels.TraceableTcpTransportChannel;
import org.opensearch.threadpool.ThreadPool;

import java.io.EOFException;
Expand Down Expand Up @@ -200,7 +200,7 @@ private <T extends TransportRequest> void handleRequest(TcpChannel channel, Head
assert message.isShortCircuit() == false;
final StreamInput stream = namedWriteableStream(message.openOrGetStreamInput());
assertRemoteVersion(stream, header.getVersion());
final TransportChannel transportChannel = new TcpTransportChannel(
final TcpTransportChannel transportChannel = new TcpTransportChannel(
outboundHandler,
channel,
action,
Expand All @@ -211,7 +211,7 @@ private <T extends TransportRequest> void handleRequest(TcpChannel channel, Head
header.isHandshake(),
message.takeBreakerReleaseControl()
);
TransportChannel traceableTransportChannel = TraceableTransportChannel.create(transportChannel, span, tracer, channel);
TransportChannel traceableTransportChannel = TraceableTcpTransportChannel.create(transportChannel, span, tracer, channel);
try {
handshaker.handleHandshake(traceableTransportChannel, requestId, stream);
} catch (Exception e) {
Expand All @@ -230,7 +230,7 @@ private <T extends TransportRequest> void handleRequest(TcpChannel channel, Head
}
}
} else {
final TransportChannel transportChannel = new TcpTransportChannel(
final TcpTransportChannel transportChannel = new TcpTransportChannel(
outboundHandler,
channel,
action,
Expand All @@ -241,7 +241,7 @@ private <T extends TransportRequest> void handleRequest(TcpChannel channel, Head
header.isHandshake(),
message.takeBreakerReleaseControl()
);
TransportChannel traceableTransportChannel = TraceableTransportChannel.create(transportChannel, span, tracer, channel);
TransportChannel traceableTransportChannel = TraceableTcpTransportChannel.create(transportChannel, span, tracer, channel);
try {
messageListener.onRequestReceived(requestId, action);
if (message.isShortCircuit()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,14 @@ public void processMessageReceived(Request request, TransportChannel channel) th

Releasable unregisterTask = () -> taskManager.unregister(task);
try {
if (channel instanceof TcpTransportChannel && task instanceof CancellableTask) {
if (channel instanceof BaseTcpTransportChannel && task instanceof CancellableTask) {
if (request instanceof ShardSearchRequest) {
// on receiving request, update the inbound network time to reflect time spent in transit over the network
((ShardSearchRequest) request).setInboundNetworkTime(
Math.max(0, System.currentTimeMillis() - ((ShardSearchRequest) request).getInboundNetworkTime())
);
}
final TcpChannel tcpChannel = ((TcpTransportChannel) channel).getChannel();
final TcpChannel tcpChannel = ((BaseTcpTransportChannel) channel).getChannel();
final Releasable stopTracking = taskManager.startTrackingCancellableChannelTask(tcpChannel, (CancellableTask) task);
unregisterTask = Releasables.wrap(unregisterTask, stopTracking);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
*
* @opensearch.internal
*/
public final class TcpTransportChannel implements TransportChannel {
public final class TcpTransportChannel extends BaseTcpTransportChannel {

private final AtomicBoolean released = new AtomicBoolean();
private final OutboundHandler outboundHandler;
Expand All @@ -70,6 +70,7 @@ public final class TcpTransportChannel implements TransportChannel {
boolean isHandshake,
Releasable breakerRelease
) {
super(channel);
this.version = version;
this.features = features;
this.channel = channel;
Expand Down Expand Up @@ -131,7 +132,4 @@ public Version getVersion() {
return version;
}

public TcpChannel getChannel() {
return channel;
}
}

0 comments on commit 6b3c132

Please sign in to comment.