Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop user agent matches and a bugfix for over-committing KafkaRecords too early #468

Merged
merged 7 commits into from
Dec 11, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,6 @@
default CompletableFuture<T> flushCommitAndResetStream(boolean isFinal) throws IOException {
return CompletableFuture.completedFuture(null);
}

default void cancelCaptureForCurrentRequest(Instant timestamp) throws IOException {}

Check warning on line 82 in TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/IChannelConnectionCaptureListener.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/IChannelConnectionCaptureListener.java#L82

Added line #L82 was not covered by tests
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.migrations.trafficcapture.protos.EndOfSegmentsIndication;
import org.opensearch.migrations.trafficcapture.protos.ReadObservation;
import org.opensearch.migrations.trafficcapture.protos.ReadSegmentObservation;
import org.opensearch.migrations.trafficcapture.protos.RequestIntentionallyDropped;
import org.opensearch.migrations.trafficcapture.protos.TrafficObservation;
import org.opensearch.migrations.trafficcapture.protos.TrafficStream;
import org.opensearch.migrations.trafficcapture.protos.WriteObservation;
Expand Down Expand Up @@ -204,6 +205,16 @@ public CompletableFuture<T> flushCommitAndResetStream(boolean isFinal) throws IO
return future;
}

@Override
public void cancelCaptureForCurrentRequest(Instant timestamp) throws IOException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly we will still write any packets up to the point that we can decipher the headers. At which case we will add this new observation to signify that the preceding observations can be ignored. It seems like ideally we wouldn't even write them to Kafka but do agree that is not necessarily an easy thing to do with our current logic.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct. If a caller wants to make sure that there's no trace, it's reasonable that they'll be able to send in a request with the filtering logic early enough (first packet). That said, we don't parse the headers as they come in, but rather once they've all arrived. Doing them as they come in would be a really nice to have.

However, there may be cases that are spread across time. We have two choices. Buffer and manage, or offload ASAP (keeping in mind that there's a lot of buffering through the stacks, just not super easy to retract). Not capturing is an optimization on the wire protocol, but at the expense of compute & memory for the proxy. Not replaying is the visible high-level requirement, which we'll meet by adding the tombstone. Trying to do more for what could be very rare cases, considering that we're going to be setup to log nearly all of the traffic anyway doesn't seem like an investment that would ever pay off. (as much as it pains me to NOT do that wire optimization)

beginSubstreamObservation(timestamp, TrafficObservation.REQUESTDROPPED_FIELD_NUMBER, 1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have any logic on the Replayer to handle this new observation?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question - as of when you reviewed this, No. I've pushed a new commit.
I had gotten distracted yesterday when I had published it & had forgotten to walk through the unit tests to make sure that they were complete (they weren't & the broke the same way that the code broke).

getOrCreateCodedOutputStream().writeMessage(TrafficObservation.REQUESTDROPPED_FIELD_NUMBER,
RequestIntentionallyDropped.getDefaultInstance());
this.readObservationsAreWaitingForEom = false;
this.firstLineByteLength = -1;
this.headersByteLength = -1;
}

@Override
public void addBindEvent(Instant timestamp, SocketAddress addr) throws IOException {
// not implemented for this serializer. The v1.0 version of the replayer will ignore this type of observation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ message EndOfMessageIndication {
optional int32 firstLineByteLength = 1;
optional int32 headersByteLength = 2;
}
message RequestIntentionallyDropped {}

message TrafficObservation {
google.protobuf.Timestamp ts = 1;
Expand All @@ -61,6 +62,8 @@ message TrafficObservation {
// having been committed to the stream.
EndOfSegmentsIndication segmentEnd = 14;
EndOfMessageIndication endOfMessageIndicator = 15;

RequestIntentionallyDropped requestDropped = 16;
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.opensearch.migrations.trafficcapture.protos;

import com.google.protobuf.Timestamp;

import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Optional;
import java.util.stream.Collectors;
Expand All @@ -22,11 +24,32 @@ public static Optional<Instant> getFirstTimestamp(TrafficStream ts) {

public static String summarizeTrafficStream(TrafficStream ts) {
var listSummaryStr = ts.getSubStreamList().stream()
.map(tso->instantFromProtoTimestamp(tso.getTs()) + ": " + captureCaseToString(tso.getCaptureCase()))
.map(tso->instantFromProtoTimestamp(tso.getTs()) + ": " + captureCaseToString(tso.getCaptureCase())
+ getOptionalContext(tso))
.collect(Collectors.joining(", "));
return ts.getConnectionId() + " (#" + getTrafficStreamIndex(ts) + ")[" + listSummaryStr + "]";
}

private static Object getOptionalContext(TrafficObservation tso) {
return Optional.ofNullable(getByteArrayForDataOf(tso))
.map(b->" " + new String(b, 0, Math.min(3, b.length), StandardCharsets.UTF_8))
.orElse("");
}

private static byte[] getByteArrayForDataOf(TrafficObservation tso) {
if (tso.hasRead()) {
return tso.getRead().getData().toByteArray();
} else if (tso.hasReadSegment()) {
return tso.getReadSegment().getData().toByteArray();
} else if (tso.hasWrite()) {
return tso.getWrite().getData().toByteArray();
} else if (tso.hasWriteSegment()) {
return tso.getWriteSegment().getData().toByteArray();
} else {
return null;
}
}

public static int getTrafficStreamIndex(TrafficStream ts) {
return ts.hasNumber() ? ts.getNumber() : ts.getNumberOfThisLastChunk();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.netty.handler.codec.http.HttpRequest;
import io.netty.util.ReferenceCountUtil;
import lombok.Lombok;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.opensearch.migrations.trafficcapture.IChannelConnectionCaptureSerializer;

Expand All @@ -13,33 +14,35 @@
public class ConditionallyReliableLoggingHttpRequestHandler<T> extends LoggingHttpRequestHandler<T> {
private final Predicate<HttpRequest> shouldBlockPredicate;

public ConditionallyReliableLoggingHttpRequestHandler(IChannelConnectionCaptureSerializer<T> trafficOffloader,
Predicate<HttpRequest> headerPredicateForWhenToBlock) {
super(trafficOffloader);
public ConditionallyReliableLoggingHttpRequestHandler(@NonNull IChannelConnectionCaptureSerializer<T> trafficOffloader,
@NonNull RequestCapturePredicate requestCapturePredicate,
@NonNull Predicate<HttpRequest> headerPredicateForWhenToBlock) {
super(trafficOffloader, requestCapturePredicate);
this.shouldBlockPredicate = headerPredicateForWhenToBlock;
}

@Override
protected void channelFinishedReadingAnHttpMessage(ChannelHandlerContext ctx, Object msg, HttpRequest httpRequest)
protected void channelFinishedReadingAnHttpMessage(ChannelHandlerContext ctx, Object msg,
boolean shouldCapture, HttpRequest httpRequest)
throws Exception {
if (shouldBlockPredicate.test(httpRequest)) {
if (shouldCapture && shouldBlockPredicate.test(httpRequest)) {
trafficOffloader.flushCommitAndResetStream(false).whenComplete((result, t) -> {
if (t != null) {
// This is a spot where we would benefit from having a behavioral policy that different users
// could set as needed. Some users may be fine with just logging a failed offloading of a request
// where other users may want to stop entirely. JIRA here: https://opensearch.atlassian.net/browse/MIGRATIONS-1276
log.atWarn().setCause(t).setMessage("Got error").log();

Check warning on line 34 in TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpRequestHandler.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpRequestHandler.java#L34

Added line #L34 was not covered by tests
ReferenceCountUtil.release(msg);
} else {
try {
super.channelFinishedReadingAnHttpMessage(ctx, msg, httpRequest);
super.channelFinishedReadingAnHttpMessage(ctx, msg, shouldCapture, httpRequest);
} catch (Exception e) {
throw Lombok.sneakyThrow(e);
}
}
});
} else {
super.channelFinishedReadingAnHttpMessage(ctx, msg, httpRequest);
super.channelFinishedReadingAnHttpMessage(ctx, msg, shouldCapture, httpRequest);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.opensearch.migrations.trafficcapture.netty;


import io.netty.handler.codec.http.HttpRequest;

import java.util.Map;
import java.util.Optional;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

public class HeaderValueFilteringCapturePredicate extends RequestCapturePredicate {
private final Map<String, Pattern> headerToPredicateRegexMap;

public HeaderValueFilteringCapturePredicate(Map<String, String> suppressCaptureHeaderPairs) {
super(new PassThruHttpHeaders.HttpHeadersToPreserve(suppressCaptureHeaderPairs.keySet()
.toArray(String[]::new)));
headerToPredicateRegexMap = suppressCaptureHeaderPairs.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, kvp->Pattern.compile(kvp.getValue())));
}

@Override
public CaptureDirective apply(HttpRequest request) {
return headerToPredicateRegexMap.entrySet().stream().anyMatch(kvp->
Optional.ofNullable(request.headers().get(kvp.getKey()))
.map(v->kvp.getValue().matcher(v).matches())
.orElse(false)
) ? CaptureDirective.DROP : CaptureDirective.CAPTURE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.netty.handler.codec.http.LastHttpContent;
import lombok.Getter;
import lombok.Lombok;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.opensearch.migrations.coreutils.MetricsAttributeKey;
import org.opensearch.migrations.coreutils.MetricsEvent;
Expand All @@ -28,6 +29,12 @@ public class LoggingHttpRequestHandler<T> extends ChannelInboundHandlerAdapter {
private static final MetricsLogger metricsLogger = new MetricsLogger("LoggingHttpRequestHandler");

static class SimpleHttpRequestDecoder extends HttpRequestDecoder {
private final PassThruHttpHeaders.HttpHeadersToPreserve headersToPreserve;

public SimpleHttpRequestDecoder(@NonNull PassThruHttpHeaders.HttpHeadersToPreserve headersToPreserve) {
this.headersToPreserve = headersToPreserve;
}

/**
* Override this so that the HttpHeaders object can be a cheaper one. PassThruHeaders
* only stores a handful of headers that are required for parsing the payload portion
Expand All @@ -37,19 +44,33 @@ static class SimpleHttpRequestDecoder extends HttpRequestDecoder {
public HttpMessage createMessage(String[] initialLine) throws Exception {
return new DefaultHttpRequest(HttpVersion.valueOf(initialLine[2]),
HttpMethod.valueOf(initialLine[0]), initialLine[1]
, new PassThruHttpHeaders()
, new PassThruHttpHeaders(headersToPreserve)
);
}
}

static class SimpleDecodedHttpRequestHandler extends ChannelInboundHandlerAdapter {
@Getter
private HttpRequest currentRequest;
final RequestCapturePredicate requestCapturePredicate;
boolean isDone;
boolean shouldCapture;
boolean liveReadObservationsInOffloader;

SimpleDecodedHttpRequestHandler(RequestCapturePredicate requestCapturePredicate) {
this.requestCapturePredicate = requestCapturePredicate;
this.currentRequest = null;
this.isDone = false;
this.shouldCapture = true;
liveReadObservationsInOffloader = false;
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg) throws Exception {
if (msg instanceof HttpRequest) {
currentRequest = (HttpRequest) msg;
shouldCapture = RequestCapturePredicate.CaptureDirective.CAPTURE ==
requestCapturePredicate.apply((HttpRequest) msg);
} else if (msg instanceof HttpContent) {
((HttpContent)msg).release();
if (msg instanceof LastHttpContent) {
Expand All @@ -61,35 +82,28 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
}

public HttpRequest resetCurrentRequest() {
isDone = false;
this.shouldCapture = true;
this.isDone = false;
var old = currentRequest;
currentRequest = null;
this.currentRequest = null;
this.liveReadObservationsInOffloader = false;
return old;
}
}

protected final IChannelConnectionCaptureSerializer<T> trafficOffloader;

protected final EmbeddedChannel httpDecoderChannel;
protected final SimpleHttpRequestDecoder requestDecoder;


public LoggingHttpRequestHandler(IChannelConnectionCaptureSerializer<T> trafficOffloader) {
public LoggingHttpRequestHandler(IChannelConnectionCaptureSerializer<T> trafficOffloader,
@NonNull RequestCapturePredicate httpHeadersCapturePredicate) {
this.trafficOffloader = trafficOffloader;
requestDecoder = new SimpleHttpRequestDecoder(); // as a field for easier debugging
httpDecoderChannel = new EmbeddedChannel(
requestDecoder,
new SimpleDecodedHttpRequestHandler()
new SimpleHttpRequestDecoder(httpHeadersCapturePredicate.getHeadersRequiredForMatcher()),
new SimpleDecodedHttpRequestHandler(httpHeadersCapturePredicate)
);
}

private HttpProcessedState parseHttpMessageParts(ByteBuf msg) {
httpDecoderChannel.writeInbound(msg); // Consume this outright, up to the caller to know what else to do
return getHandlerThatHoldsParsedHttpRequest().isDone ?
HttpProcessedState.FULL_MESSAGE :
HttpProcessedState.ONGOING;
}

private SimpleDecodedHttpRequestHandler getHandlerThatHoldsParsedHttpRequest() {
return (SimpleDecodedHttpRequestHandler) httpDecoderChannel.pipeline().last();
}
Expand Down Expand Up @@ -126,7 +140,8 @@ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
super.handlerRemoved(ctx);
}

protected void channelFinishedReadingAnHttpMessage(ChannelHandlerContext ctx, Object msg, HttpRequest httpRequest) throws Exception {
protected void channelFinishedReadingAnHttpMessage(ChannelHandlerContext ctx, Object msg, boolean shouldCapture,
HttpRequest httpRequest) throws Exception {
super.channelRead(ctx, msg);
metricsLogger.atSuccess(MetricsEvent.RECEIVED_FULL_HTTP_REQUEST)
.setAttribute(MetricsAttributeKey.CHANNEL_ID, ctx.channel().id().asLongText())
Expand All @@ -137,25 +152,32 @@ protected void channelFinishedReadingAnHttpMessage(ChannelHandlerContext ctx, Ob
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
var timestamp = Instant.now();
HttpProcessedState httpProcessedState;
{
var bb = ((ByteBuf) msg).retainedDuplicate();
var requestParsingHandler = getHandlerThatHoldsParsedHttpRequest();
var bb = ((ByteBuf) msg);
httpDecoderChannel.writeInbound(bb.retainedDuplicate()); // the ByteBuf is consumed/release by this method
var shouldCapture = requestParsingHandler.shouldCapture;
if (shouldCapture) {
requestParsingHandler.liveReadObservationsInOffloader = true;
trafficOffloader.addReadEvent(timestamp, bb);
metricsLogger.atSuccess(MetricsEvent.RECEIVED_REQUEST_COMPONENT)
.setAttribute(MetricsAttributeKey.CHANNEL_ID, ctx.channel().id().asLongText()).emit();

httpProcessedState = parseHttpMessageParts(bb); // bb is consumed/release by this method
} else if (requestParsingHandler.liveReadObservationsInOffloader) {
trafficOffloader.cancelCaptureForCurrentRequest(timestamp);
requestParsingHandler.liveReadObservationsInOffloader = false;
}
if (httpProcessedState == HttpProcessedState.FULL_MESSAGE) {
var httpRequest = getHandlerThatHoldsParsedHttpRequest().resetCurrentRequest();
var decoderResultLoose = httpRequest.decoderResult();
if (decoderResultLoose instanceof HttpMessageDecoderResult) {
var decoderResult = (HttpMessageDecoderResult) decoderResultLoose;
trafficOffloader.addEndOfFirstLineIndicator(decoderResult.initialLineLength());
trafficOffloader.addEndOfHeadersIndicator(decoderResult.headerSize());
metricsLogger.atSuccess(MetricsEvent.RECEIVED_REQUEST_COMPONENT)
.setAttribute(MetricsAttributeKey.CHANNEL_ID, ctx.channel().id().asLongText()).emit();

if (requestParsingHandler.isDone) {
var httpRequest = requestParsingHandler.resetCurrentRequest();
if (shouldCapture) {
var decoderResultLoose = httpRequest.decoderResult();
if (decoderResultLoose instanceof HttpMessageDecoderResult) {
var decoderResult = (HttpMessageDecoderResult) decoderResultLoose;
trafficOffloader.addEndOfFirstLineIndicator(decoderResult.initialLineLength());
trafficOffloader.addEndOfHeadersIndicator(decoderResult.headerSize());
}
trafficOffloader.commitEndOfHttpMessageIndicator(timestamp);
}
trafficOffloader.commitEndOfHttpMessageIndicator(timestamp);
channelFinishedReadingAnHttpMessage(ctx, msg, httpRequest);
channelFinishedReadingAnHttpMessage(ctx, msg, shouldCapture, httpRequest);
} else {
super.channelRead(ctx, msg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,37 @@
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaders;
import lombok.NonNull;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Stream;

public class PassThruHttpHeaders extends DefaultHttpHeaders {

private static final DefaultHttpHeaders HEADERS_TO_PRESERVE = makeHeadersToPreserve();
/**
* Use the HttpHeaders class because it does case insensitive matches.
*/
private final HttpHeaders mapWithCaseInsensitiveHeaders;

public static class HttpHeadersToPreserve {
private final HttpHeaders caseInsensitiveHeadersMap;
public HttpHeadersToPreserve(String... extraHeaderNames) {
caseInsensitiveHeadersMap = new DefaultHttpHeaders();
Stream.concat(Stream.of(HttpHeaderNames.CONTENT_LENGTH.toString(),
HttpHeaderNames.CONTENT_TRANSFER_ENCODING.toString(),
HttpHeaderNames.TRAILER.toString()),
Arrays.stream(extraHeaderNames))
.forEach(h->caseInsensitiveHeadersMap.add(h, ""));
}
}

private static DefaultHttpHeaders makeHeadersToPreserve() {
var h = new DefaultHttpHeaders(false);
h.add(HttpHeaderNames.CONTENT_LENGTH, "");
h.add(HttpHeaderNames.CONTENT_TRANSFER_ENCODING, "");
h.add(HttpHeaderNames.TRAILER, "");
return h;
public PassThruHttpHeaders(@NonNull HttpHeadersToPreserve headersToPreserve) {
this.mapWithCaseInsensitiveHeaders = headersToPreserve.caseInsensitiveHeadersMap;
}

private static boolean headerNameShouldBeTracked(CharSequence name) {
return HEADERS_TO_PRESERVE.contains(name);
private boolean headerNameShouldBeTracked(CharSequence name) {
return mapWithCaseInsensitiveHeaders.contains(name);
}

@Override
Expand Down
Loading
Loading