Skip to content

Commit

Permalink
[Tracing Instrumentation] Add instrumentation in InboundHandler (#10143)
Browse files Browse the repository at this point in the history
* Add instrumentation in InboundHandler

Signed-off-by: Gagan Juneja <[email protected]>

* Add CHANGELOG

Signed-off-by: Gagan Juneja <[email protected]>

* Address review comment

Signed-off-by: Gagan Juneja <[email protected]>

* Address review comment

Signed-off-by: Gagan Juneja <[email protected]>

* Address review comment

Signed-off-by: Gagan Juneja <[email protected]>

* Empty-Commit

Signed-off-by: Gagan Juneja <[email protected]>

* Address review comment

Signed-off-by: Gagan Juneja <[email protected]>

* Address review comment

Signed-off-by: Gagan Juneja <[email protected]>

* Address review comment

Signed-off-by: Gagan Juneja <[email protected]>

---------

Signed-off-by: Gagan Juneja <[email protected]>
Signed-off-by: Gagan Juneja <[email protected]>
Co-authored-by: Gagan Juneja <[email protected]>
  • Loading branch information
Gaganjuneja and Gagan Juneja authored Oct 5, 2023
1 parent 2965e69 commit aac0f09
Show file tree
Hide file tree
Showing 41 changed files with 374 additions and 129 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Tracing Framework] Add support for SpanKind. ([#10122](https://github.com/opensearch-project/OpenSearch/pull/10122))
- Pass parent filter to inner query in nested query ([#10246](https://github.com/opensearch-project/OpenSearch/pull/10246))
- Disable concurrent segment search when terminate_after is used ([#10200](https://github.com/opensearch-project/OpenSearch/pull/10200))
- Add instrumentation in Inbound Handler. ([#100143](https://github.com/opensearch-project/OpenSearch/pull/10143))
- Enable remote segment upload backpressure by default ([#10356](https://github.com/opensearch-project/OpenSearch/pull/10356))

### Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ public Map<String, Supplier<Transport>> getTransports(
PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService
NetworkService networkService,
Tracer tracer
) {
return Collections.singletonMap(
NETTY_TRANSPORT_NAME,
Expand All @@ -108,7 +109,8 @@ public Map<String, Supplier<Transport>> getTransports(
pageCacheRecycler,
namedWriteableRegistry,
circuitBreakerService,
getSharedGroupFactory(settings)
getSharedGroupFactory(settings),
tracer
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.indices.breaker.CircuitBreakerService;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.Netty4NioSocketChannel;
import org.opensearch.transport.NettyAllocator;
Expand Down Expand Up @@ -131,9 +132,10 @@ public Netty4Transport(
PageCacheRecycler pageCacheRecycler,
NamedWriteableRegistry namedWriteableRegistry,
CircuitBreakerService circuitBreakerService,
SharedGroupFactory sharedGroupFactory
SharedGroupFactory sharedGroupFactory,
Tracer tracer
) {
super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService);
super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService, tracer);
Netty4Utils.setAvailableProcessors(OpenSearchExecutors.NODE_PROCESSORS_SETTING.get(settings));
NettyAllocator.logAllocatorDescriptionIfNeeded();
this.sharedGroupFactory = sharedGroupFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.core.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.SharedGroupFactory;
Expand Down Expand Up @@ -86,7 +87,8 @@ public void startThreadPool() {
recycler,
new NamedWriteableRegistry(Collections.emptyList()),
new NoneCircuitBreakerService(),
new SharedGroupFactory(settings)
new SharedGroupFactory(settings),
NoopTracer.INSTANCE
);
nettyTransport.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.common.util.PageCacheRecycler;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -141,7 +142,8 @@ private TcpTransport startTransport(Settings settings, ThreadPool threadPool) {
recycler,
new NamedWriteableRegistry(Collections.emptyList()),
new NoneCircuitBreakerService(),
new SharedGroupFactory(settings)
new SharedGroupFactory(settings),
NoopTracer.INSTANCE
);
transport.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.core.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.test.transport.StubbableTransport;
import org.opensearch.transport.AbstractSimpleTransportTestCase;
Expand Down Expand Up @@ -82,7 +83,8 @@ protected Transport build(Settings settings, final Version version, ClusterSetti
PageCacheRecycler.NON_RECYCLING_INSTANCE,
namedWriteableRegistry,
new NoneCircuitBreakerService(),
new SharedGroupFactory(settings)
new SharedGroupFactory(settings),
NoopTracer.INSTANCE
) {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ protected MockTransportService createTransportService() {
new NetworkService(Collections.emptyList()),
PageCacheRecycler.NON_RECYCLING_INSTANCE,
writableRegistry(),
new NoneCircuitBreakerService()
new NoneCircuitBreakerService(),
NoopTracer.INSTANCE
) {
@Override
public TransportAddress[] addressesFromString(String address) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ protected MockTransportService createTransportService() {
networkService,
PageCacheRecycler.NON_RECYCLING_INSTANCE,
new NamedWriteableRegistry(Collections.emptyList()),
new NoneCircuitBreakerService()
new NoneCircuitBreakerService(),
NoopTracer.INSTANCE
),
threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ public void addAttribute(String key, Boolean value) {

@Override
public void setError(Exception exception) {
delegateSpan.setStatus(StatusCode.ERROR, exception.getMessage());
if (exception != null) {
delegateSpan.setStatus(StatusCode.ERROR, exception.getMessage());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.opensearch.nio.NioSelector;
import org.opensearch.nio.NioSocketChannel;
import org.opensearch.nio.ServerChannelContext;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TcpTransport;
import org.opensearch.transport.TransportSettings;
Expand Down Expand Up @@ -84,9 +85,10 @@ protected NioTransport(
PageCacheRecycler pageCacheRecycler,
NamedWriteableRegistry namedWriteableRegistry,
CircuitBreakerService circuitBreakerService,
NioGroupFactory groupFactory
NioGroupFactory groupFactory,
Tracer tracer
) {
super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService);
super(settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService, tracer);
this.pageAllocator = new PageAllocator(pageCacheRecycler);
this.groupFactory = groupFactory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ public Map<String, Supplier<Transport>> getTransports(
PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService
NetworkService networkService,
Tracer tracer
) {
return Collections.singletonMap(
NIO_TRANSPORT_NAME,
Expand All @@ -103,7 +104,8 @@ public Map<String, Supplier<Transport>> getTransports(
pageCacheRecycler,
namedWriteableRegistry,
circuitBreakerService,
getNioGroupFactory(settings)
getNioGroupFactory(settings),
tracer
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.core.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.test.transport.StubbableTransport;
import org.opensearch.transport.AbstractSimpleTransportTestCase;
Expand Down Expand Up @@ -81,7 +82,8 @@ protected Transport build(Settings settings, final Version version, ClusterSetti
new MockPageCacheRecycler(settings),
namedWriteableRegistry,
new NoneCircuitBreakerService(),
new NioGroupFactory(settings, logger)
new NioGroupFactory(settings, logger),
NoopTracer.INSTANCE
) {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ public NetworkModule(
pageCacheRecycler,
circuitBreakerService,
namedWriteableRegistry,
networkService
networkService,
tracer
);
for (Map.Entry<String, Supplier<Transport>> entry : transportFactory.entrySet()) {
registerTransport(entry.getKey(), entry.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ default Map<String, Supplier<Transport>> getTransports(
PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
NamedWriteableRegistry namedWriteableRegistry,
NetworkService networkService
NetworkService networkService,
Tracer tracer
) {
return Collections.emptyMap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ private AttributeNames() {
*/
public static final String TRANSPORT_TARGET_HOST = "target_host";

/**
* Transport Service send request local host.
*/
public static final String TRANSPORT_HOST = "host";

/**
* Action Name.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.http.HttpRequest;
import org.opensearch.rest.RestRequest;
import org.opensearch.telemetry.tracing.attributes.Attributes;
import org.opensearch.transport.TcpChannel;
import org.opensearch.transport.Transport;

import java.util.Arrays;
Expand Down Expand Up @@ -127,4 +128,26 @@ private static Attributes buildSpanAttributes(String action, Transport.Connectio
return attributes;
}

/**
* Creates {@link SpanCreationContext} from Inbound Handler.
* @param action action.
* @param tcpChannel tcp channel.
* @return context
*/
public static SpanCreationContext from(String action, TcpChannel tcpChannel) {
return SpanCreationContext.server().name(createSpanName(action, tcpChannel)).attributes(buildSpanAttributes(action, tcpChannel));
}

private static String createSpanName(String action, TcpChannel tcpChannel) {
return action + SEPARATOR + (tcpChannel.getRemoteAddress() != null
? tcpChannel.getRemoteAddress().getHostString()
: tcpChannel.getLocalAddress().getHostString());
}

private static Attributes buildSpanAttributes(String action, TcpChannel tcpChannel) {
Attributes attributes = Attributes.create().addAttribute(AttributeNames.TRANSPORT_ACTION, action);
attributes.addAttribute(AttributeNames.TRANSPORT_HOST, tcpChannel.getLocalAddress().getHostString());
return attributes;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.telemetry.tracing.channels;

import org.opensearch.Version;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.SpanScope;
import org.opensearch.telemetry.tracing.Tracer;
import org.opensearch.transport.BaseTcpTransportChannel;
import org.opensearch.transport.TcpTransportChannel;
import org.opensearch.transport.TransportChannel;

import java.io.IOException;

/**
* Tracer wrapped {@link TransportChannel}
*/
public class TraceableTcpTransportChannel extends BaseTcpTransportChannel {

private final TransportChannel delegate;
private final Span span;
private final Tracer tracer;

/**
* Constructor.
* @param delegate delegate
* @param span span
* @param tracer tracer
*/
public TraceableTcpTransportChannel(TcpTransportChannel delegate, Span span, Tracer tracer) {
super(delegate.getChannel());
this.delegate = delegate;
this.span = span;
this.tracer = tracer;
}

/**
* Factory method.
*
* @param delegate delegate
* @param span span
* @param tracer tracer
* @return transport channel
*/
public static TransportChannel create(TcpTransportChannel delegate, final Span span, final Tracer tracer) {
if (FeatureFlags.isEnabled(FeatureFlags.TELEMETRY) == true) {
delegate.getChannel().addCloseListener(new ActionListener<Void>() {
@Override
public void onResponse(Void unused) {
onFailure(null);
}

@Override
public void onFailure(Exception e) {
span.addEvent("The TransportChannel was closed without sending the response");
span.setError(e);
span.endSpan();
}
});

return new TraceableTcpTransportChannel(delegate, span, tracer);
} else {
return delegate;
}
}

@Override
public String getProfileName() {
return delegate.getProfileName();
}

@Override
public String getChannelType() {
return delegate.getChannelType();
}

@Override
public void sendResponse(TransportResponse response) throws IOException {
try (SpanScope scope = tracer.withSpanInScope(span)) {
delegate.sendResponse(response);
} catch (final IOException ex) {
span.setError(ex);
throw ex;
} finally {
span.endSpan();
}
}

@Override
public void sendResponse(Exception exception) throws IOException {
try (SpanScope scope = tracer.withSpanInScope(span)) {
delegate.sendResponse(exception);
} finally {
span.setError(exception);
span.endSpan();
}
}

@Override
public Version getVersion() {
return delegate.getVersion();
}
}
Loading

0 comments on commit aac0f09

Please sign in to comment.