Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Suppress capturing responses #473

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
a4caca7
Move addMetricsIfPresent into the metrics builder as a first class me…
gregschohn Nov 16, 2023
c026588
WIP to play with OpenTelemetry metric instruments and tracer spans.
gregschohn Nov 17, 2023
f3c0077
Get gradle files and docker-compose in order to support otlp exports …
gregschohn Nov 27, 2023
7fb8e2e
WIP
gregschohn Nov 27, 2023
a8ae3d1
Restore the docker-compose single-node/multi-node split docker-compos…
gregschohn Nov 28, 2023
da9d36b
Add labels to each metric instrument so that multiple values can be p…
gregschohn Nov 28, 2023
06618ca
Move the MetricsClosure into its own class and stop stuffing the metr…
gregschohn Nov 28, 2023
aba1aab
WIP - Cleanup + get Jaeger to work by switching the endpoint. Also i…
gregschohn Nov 28, 2023
900bc6d
Start moving away from ThreadLocal and 'current contexts' and toward …
gregschohn Nov 29, 2023
3746a8e
Get span parenting to work.
gregschohn Nov 30, 2023
e0e7bf1
Merge branch 'main' into DoNotMerge_MoreMetrics
gregschohn Nov 30, 2023
4b43262
Attempt to fix a failing unit test.
gregschohn Nov 30, 2023
322e12f
Refactor. Couple name changes, class package changes, and moved IRep…
gregschohn Nov 30, 2023
723bf77
Bundle all of the offloader spans with the netty handler spans.
gregschohn Dec 1, 2023
15a1705
Improve the tracing story for the capture proxy.
gregschohn Dec 2, 2023
8a6f52a
Tracing change: Flatten the flush span and just record it as 'blocked'.
gregschohn Dec 2, 2023
c50e01d
Minor cleanup - stop setting the namespace or trying to change in a p…
gregschohn Dec 4, 2023
17c517d
Start instrumenting the replayer with more contexts so that traces an…
gregschohn Dec 4, 2023
6288844
Double down on using Context objects in lieu of String labels and fix…
gregschohn Dec 11, 2023
09e849c
Merge branch 'FixKafkaResume' into OtelMetricsAndTraces
gregschohn Dec 11, 2023
9cf2540
Merge branch 'main' into OtelMetricsAndTraces
gregschohn Dec 12, 2023
c14da6a
Update the Http Logging Handler to suppress response packet captures …
gregschohn Dec 12, 2023
7de4009
File rename since the LoggingHttpRequest handler now handles both req…
gregschohn Dec 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,23 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.util.ReferenceCountUtil;
import io.opentelemetry.api.trace.Span;
import lombok.Getter;
import lombok.Lombok;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.opensearch.migrations.tracing.IWithStartTimeAndAttributes;
import org.opensearch.migrations.tracing.commoncontexts.IConnectionContext;
import org.opensearch.migrations.trafficcapture.IChannelConnectionCaptureSerializer;
import org.opensearch.migrations.trafficcapture.IConnectionCaptureFactory;
import org.opensearch.migrations.trafficcapture.netty.tracing.HttpMessageContext;
import org.opensearch.migrations.trafficcapture.tracing.ConnectionContext;

import java.io.IOException;
import java.time.Instant;
import java.util.function.Function;
import java.util.function.Predicate;

@Slf4j
public class ConditionallyReliableLoggingHttpRequestHandler<T> extends LoggingHttpRequestHandler<T> {
public class ConditionallyReliableLoggingHttpHandler<T> extends LoggingHttpHandler<T> {
private final Predicate<HttpRequest> shouldBlockPredicate;

public ConditionallyReliableLoggingHttpRequestHandler(@NonNull String nodeId, String connectionId,
@NonNull IConnectionCaptureFactory<T> trafficOffloaderFactory,
@NonNull RequestCapturePredicate requestCapturePredicate,
@NonNull Predicate<HttpRequest> headerPredicateForWhenToBlock)
public ConditionallyReliableLoggingHttpHandler(@NonNull String nodeId, String connectionId,
@NonNull IConnectionCaptureFactory<T> trafficOffloaderFactory,
@NonNull RequestCapturePredicate requestCapturePredicate,
@NonNull Predicate<HttpRequest> headerPredicateForWhenToBlock)
throws IOException {
super(nodeId, connectionId, trafficOffloaderFactory, requestCapturePredicate);
this.shouldBlockPredicate = headerPredicateForWhenToBlock;
Expand All @@ -52,7 +44,7 @@
// This is a spot where we would benefit from having a behavioral policy that different users
// could set as needed. Some users may be fine with just logging a failed offloading of a request
// where other users may want to stop entirely. JIRA here: https://opensearch.atlassian.net/browse/MIGRATIONS-1276
log.atWarn().setCause(t).setMessage("Dropping request - Got error").log();

Check warning on line 47 in TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpHandler.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpHandler.java#L47

Added line #L47 was not covered by tests
ReferenceCountUtil.release(msg);
} else {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import java.time.Instant;

@Slf4j
public class LoggingHttpRequestHandler<T> extends ChannelDuplexHandler {
public class LoggingHttpHandler<T> extends ChannelDuplexHandler {
public static final String TELEMETRY_SCOPE_NAME = "CapturingHttpHandler";
public static final SimpleMeteringClosure METERING_CLOSURE = new SimpleMeteringClosure(TELEMETRY_SCOPE_NAME);
private static final MetricsLogger metricsLogger = new MetricsLogger("LoggingHttpRequestHandler");
Expand All @@ -41,11 +41,40 @@
public static final String GATHERING_RESPONSE = "gatheringResponse";
public static final String BLOCKED = "blocked";

static class CaptureIgnoreState {
static final byte CAPTURE = 0;
static final byte IGNORE_REQUEST = 1;
static final byte IGNORE_RESPONSE = 2;
private CaptureIgnoreState() {}
}

static class CaptureState {
byte captureIgnoreState = CaptureIgnoreState.CAPTURE;
boolean liveReadObservationsInOffloader = false;

boolean shouldCapture() {
return captureIgnoreState == CaptureIgnoreState.CAPTURE;
}

public void setShouldCaptureForRequest(boolean b) {
captureIgnoreState = b ? CaptureIgnoreState.CAPTURE : CaptureIgnoreState.IGNORE_REQUEST;
}

public void advanceStateModelIntoResponseGather() {
if (CaptureIgnoreState.CAPTURE != captureIgnoreState) {
captureIgnoreState = CaptureIgnoreState.IGNORE_RESPONSE;
}
}
}

static class SimpleHttpRequestDecoder extends HttpRequestDecoder {
private final PassThruHttpHeaders.HttpHeadersToPreserve headersToPreserve;
private final CaptureState captureState;

public SimpleHttpRequestDecoder(@NonNull PassThruHttpHeaders.HttpHeadersToPreserve headersToPreserve) {
public SimpleHttpRequestDecoder(@NonNull PassThruHttpHeaders.HttpHeadersToPreserve headersToPreserve,
CaptureState captureState) {
this.headersToPreserve = headersToPreserve;
this.captureState = captureState;
}

/**
Expand All @@ -60,46 +89,50 @@
, new PassThruHttpHeaders(headersToPreserve)
);
}
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (captureState.captureIgnoreState == CaptureIgnoreState.IGNORE_RESPONSE) {
captureState.captureIgnoreState = CaptureIgnoreState.CAPTURE;
}
Comment on lines +95 to +97
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to understand this... If the state is to ignore response and we have a read we will set to capture. Is that because reads in this case will only be for request packets and we can reset back to capture or is something else happening here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default assumption is that all requests and their responses will be captured until a request is marked to IGNORE_REQUEST. This is a reset since a new request has begun... and yes, reads are only for requests and writes are only for responses and that's why we now have two states for ignore.

super.channelRead(ctx, msg);
}
}

static class SimpleDecodedHttpRequestHandler extends ChannelInboundHandlerAdapter {
@Getter

Check warning on line 103 in TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/LoggingHttpHandler.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/LoggingHttpHandler.java#L103

Added line #L103 was not covered by tests
private HttpRequest currentRequest;
final RequestCapturePredicate requestCapturePredicate;
boolean isDone;
boolean shouldCapture;
boolean liveReadObservationsInOffloader;

SimpleDecodedHttpRequestHandler(RequestCapturePredicate requestCapturePredicate) {
boolean haveParsedFullRequest;
final CaptureState captureState;

SimpleDecodedHttpRequestHandler(RequestCapturePredicate requestCapturePredicate, CaptureState captureState) {
this.requestCapturePredicate = requestCapturePredicate;
this.currentRequest = null;
this.isDone = false;
this.shouldCapture = true;
liveReadObservationsInOffloader = false;
this.haveParsedFullRequest = false;
this.captureState = captureState;
}

@Override
public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg) throws Exception {
if (msg instanceof HttpRequest) {
currentRequest = (HttpRequest) msg;
shouldCapture = RequestCapturePredicate.CaptureDirective.CAPTURE ==
requestCapturePredicate.apply((HttpRequest) msg);
captureState.setShouldCaptureForRequest(RequestCapturePredicate.CaptureDirective.CAPTURE ==
requestCapturePredicate.apply((HttpRequest) msg));
} else if (msg instanceof HttpContent) {
((HttpContent)msg).release();
if (msg instanceof LastHttpContent) {
isDone = true;
haveParsedFullRequest = true;
}
} else {
super.channelRead(ctx, msg);

Check warning on line 128 in TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/LoggingHttpHandler.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/LoggingHttpHandler.java#L128

Added line #L128 was not covered by tests
}
}

public HttpRequest resetCurrentRequest() {
this.shouldCapture = true;
this.isDone = false;
this.haveParsedFullRequest = false;
var old = currentRequest;
this.currentRequest = null;
this.liveReadObservationsInOffloader = false;
return old;
}
}
Expand All @@ -110,9 +143,9 @@

protected HttpMessageContext messageContext;

public LoggingHttpRequestHandler(String nodeId, String channelKey,
@NonNull IConnectionCaptureFactory<T> trafficOffloaderFactory,
@NonNull RequestCapturePredicate httpHeadersCapturePredicate)
public LoggingHttpHandler(String nodeId, String channelKey,
@NonNull IConnectionCaptureFactory<T> trafficOffloaderFactory,
@NonNull RequestCapturePredicate httpHeadersCapturePredicate)
throws IOException {
var parentContext = new ConnectionContext(channelKey, nodeId,
METERING_CLOSURE.makeSpanContinuation("connectionLifetime", null));
Expand All @@ -122,13 +155,13 @@
METERING_CLOSURE.meterIncrementEvent(messageContext, "requestStarted");

this.trafficOffloader = trafficOffloaderFactory.createOffloader(parentContext, channelKey);
var captureState = new CaptureState();
httpDecoderChannel = new EmbeddedChannel(
new SimpleHttpRequestDecoder(httpHeadersCapturePredicate.getHeadersRequiredForMatcher()),
new SimpleDecodedHttpRequestHandler(httpHeadersCapturePredicate)
new SimpleHttpRequestDecoder(httpHeadersCapturePredicate.getHeadersRequiredForMatcher(), captureState),
new SimpleDecodedHttpRequestHandler(httpHeadersCapturePredicate, captureState)
);
}


static String getSpanLabelForState(HttpMessageContext.HttpTransactionState state) {
switch (state) {
case REQUEST:
Expand All @@ -140,7 +173,7 @@
case RESPONSE:
return GATHERING_RESPONSE;
default:
throw new IllegalStateException("Unknown enum value: "+state);

Check warning on line 176 in TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/LoggingHttpHandler.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/LoggingHttpHandler.java#L176

Added line #L176 was not covered by tests
}
}

Expand All @@ -162,13 +195,13 @@
METERING_CLOSURE.meterIncrementEvent(messageContext, "unregistered");
trafficOffloader.flushCommitAndResetStream(true).whenComplete((result, t) -> {
if (t != null) {
log.warn("Got error: " + t.getMessage());
ctx.close();

Check warning on line 199 in TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/LoggingHttpHandler.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/LoggingHttpHandler.java#L198-L199

Added lines #L198 - L199 were not covered by tests
} else {
try {
super.channelUnregistered(ctx);
} catch (Exception e) {
throw Lombok.sneakyThrow(e);

Check warning on line 204 in TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/LoggingHttpHandler.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/LoggingHttpHandler.java#L203-L204

Added lines #L203 - L204 were not covered by tests
}
}
});
Expand All @@ -182,12 +215,12 @@

trafficOffloader.flushCommitAndResetStream(true).whenComplete((result, t) -> {
if (t != null) {
log.warn("Got error: " + t.getMessage());

Check warning on line 218 in TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/LoggingHttpHandler.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/LoggingHttpHandler.java#L218

Added line #L218 was not covered by tests
}
try {
super.channelUnregistered(ctx);
} catch (Exception e) {
throw Lombok.sneakyThrow(e);

Check warning on line 223 in TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/LoggingHttpHandler.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/LoggingHttpHandler.java#L222-L223

Added lines #L222 - L223 were not covered by tests
}
});
super.handlerRemoved(ctx);
Expand Down Expand Up @@ -217,16 +250,16 @@
httpDecoderChannel.writeInbound(bb.retainedDuplicate()); // the ByteBuf is consumed/release by this method

METERING_CLOSURE.meterIncrementEvent(messageContext,
getHandlerThatHoldsParsedHttpRequest().isDone ? "requestFullyParsed" : "requestPartiallyParsed");
getHandlerThatHoldsParsedHttpRequest().haveParsedFullRequest ? "requestFullyParsed" : "requestPartiallyParsed");

var shouldCapture = requestParsingHandler.shouldCapture;
var captureState = requestParsingHandler.captureState;
var shouldCapture = captureState.shouldCapture();
if (shouldCapture) {
requestParsingHandler.liveReadObservationsInOffloader = true;
captureState.liveReadObservationsInOffloader = true;
trafficOffloader.addReadEvent(timestamp, bb);

} else if (requestParsingHandler.liveReadObservationsInOffloader) {
} else if (captureState.liveReadObservationsInOffloader) {
trafficOffloader.cancelCaptureForCurrentRequest(timestamp);
requestParsingHandler.liveReadObservationsInOffloader = false;
captureState.liveReadObservationsInOffloader = false;
}

metricsLogger.atSuccess(MetricsEvent.RECEIVED_REQUEST_COMPONENT)
Expand All @@ -235,9 +268,12 @@
METERING_CLOSURE.meterIncrementEvent(messageContext, "readBytes", bb.readableBytes());


if (requestParsingHandler.isDone) {
if (requestParsingHandler.haveParsedFullRequest) {
messageContext.getCurrentSpan().end();
var httpRequest = requestParsingHandler.resetCurrentRequest();
captureState.liveReadObservationsInOffloader = false;
captureState.advanceStateModelIntoResponseGather();

if (shouldCapture) {
var decoderResultLoose = httpRequest.decoderResult();
if (decoderResultLoose instanceof HttpMessageDecoderResult) {
Expand All @@ -260,7 +296,9 @@
rotateNextMessageContext(HttpMessageContext.HttpTransactionState.RESPONSE);
}
var bb = (ByteBuf) msg;
trafficOffloader.addWriteEvent(Instant.now(), bb);
if (getHandlerThatHoldsParsedHttpRequest().captureState.shouldCapture()) {
trafficOffloader.addWriteEvent(Instant.now(), bb);
}
metricsLogger.atSuccess(MetricsEvent.RECEIVED_RESPONSE_COMPONENT)
.setAttribute(MetricsAttributeKey.CHANNEL_ID, ctx.channel().id().asLongText()).emit();
METERING_CLOSURE.meterIncrementEvent(messageContext, "write");
Expand All @@ -271,10 +309,10 @@

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
trafficOffloader.addExceptionCaughtEvent(Instant.now(), cause);
METERING_CLOSURE.meterIncrementEvent(messageContext, "exception");
httpDecoderChannel.close();
super.exceptionCaught(ctx, cause);
}

Check warning on line 316 in TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/LoggingHttpHandler.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/LoggingHttpHandler.java#L312-L316

Added lines #L312 - L316 were not covered by tests

}

This file was deleted.

Loading
Loading