From 48e9dfcb274b3aaef9a13080329ba39824689ab1 Mon Sep 17 00:00:00 2001 From: Greg Schohn Date: Wed, 6 Dec 2023 09:22:57 -0500 Subject: [PATCH 1/6] Add the ability to drop captures, while still forwarding the contents, for requests when a specific header matches some regex. Since captures are really on a connection (socket) basis and not a request one, there are additional complications. Data is captured as it is received. We may not know that we want to drop packets early enough. To compensate and to give the traffic stream enough details to differentiate this occurrence from a bug where packets were dropped, a new "RequestIntentionallyDropped" TrafficObservation has been introduced. This observation will only be used when data for a request has been included. That means that if we get a packet that has the entire HTTP preamble (first line + headers) that matches the drop filter, we won't capture anything at all. The --suppressCaptureForHeaderMatch argument has been added to the CaptureProxy. It expects two elements for a header name and a regex to match that header's value. When a request matches, it will be passed to the HTTP server, but will NOT be captured as per the above. Because the capture proxy only captures a handful of headers so that it can understand request boundaries, the predicate to scan additional headers must tweak the set of headers that the LoggingHttpRequestHandler's internal classes will preserve. Signed-off-by: Greg Schohn --- .../IChannelConnectionCaptureListener.java | 2 + ...eamChannelConnectionCaptureSerializer.java | 11 ++ .../src/main/proto/TrafficCaptureStream.proto | 3 + ...allyReliableLoggingHttpRequestHandler.java | 17 +-- .../HeaderValueFilteringCapturePredicate.java | 29 +++++ .../netty/LoggingHttpRequestHandler.java | 90 ++++++++------ .../netty/PassThruHttpHeaders.java | 34 ++++-- .../netty/RequestCapturePredicate.java | 30 +++++ ...ReliableLoggingHttpRequestHandlerTest.java | 110 +++++++++++++++--- .../trafficcapture/netty/SimpleRequests.java | 17 ++- .../proxyserver/CaptureProxy.java | 41 +++++-- .../netty/NettyScanningHttpProxy.java | 7 +- .../netty/ProxyChannelInitializer.java | 9 +- .../netty/NettyScanningHttpProxyTest.java | 4 +- 14 files changed, 317 insertions(+), 87 deletions(-) create mode 100644 TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/HeaderValueFilteringCapturePredicate.java create mode 100644 TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/RequestCapturePredicate.java diff --git a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/IChannelConnectionCaptureListener.java b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/IChannelConnectionCaptureListener.java index e60a14f2c..fe8a1b8fd 100644 --- a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/IChannelConnectionCaptureListener.java +++ b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/IChannelConnectionCaptureListener.java @@ -78,4 +78,6 @@ default void commitEndOfHttpMessageIndicator(Instant timestamp) throws IOExcepti default CompletableFuture flushCommitAndResetStream(boolean isFinal) throws IOException { return CompletableFuture.completedFuture(null); } + + default void cancelCaptureForCurrentRequest(Instant timestamp) throws IOException {} } diff --git a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java index 14c501d37..419f06060 100644 --- a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java +++ b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java @@ -13,6 +13,7 @@ import org.opensearch.migrations.trafficcapture.protos.EndOfSegmentsIndication; import org.opensearch.migrations.trafficcapture.protos.ReadObservation; import org.opensearch.migrations.trafficcapture.protos.ReadSegmentObservation; +import org.opensearch.migrations.trafficcapture.protos.RequestIntentionallyDropped; import org.opensearch.migrations.trafficcapture.protos.TrafficObservation; import org.opensearch.migrations.trafficcapture.protos.TrafficStream; import org.opensearch.migrations.trafficcapture.protos.WriteObservation; @@ -204,6 +205,16 @@ public CompletableFuture flushCommitAndResetStream(boolean isFinal) throws IO return future; } + @Override + public void cancelCaptureForCurrentRequest(Instant timestamp) throws IOException { + beginSubstreamObservation(timestamp, TrafficObservation.REQUESTDROPPED_FIELD_NUMBER, 1); + getOrCreateCodedOutputStream().writeMessage(TrafficObservation.REQUESTDROPPED_FIELD_NUMBER, + RequestIntentionallyDropped.getDefaultInstance()); + this.readObservationsAreWaitingForEom = false; + this.firstLineByteLength = -1; + this.headersByteLength = -1; + } + @Override public void addBindEvent(Instant timestamp, SocketAddress addr) throws IOException { // not implemented for this serializer. The v1.0 version of the replayer will ignore this type of observation diff --git a/TrafficCapture/captureProtobufs/src/main/proto/TrafficCaptureStream.proto b/TrafficCapture/captureProtobufs/src/main/proto/TrafficCaptureStream.proto index e10a14936..6f4032f46 100644 --- a/TrafficCapture/captureProtobufs/src/main/proto/TrafficCaptureStream.proto +++ b/TrafficCapture/captureProtobufs/src/main/proto/TrafficCaptureStream.proto @@ -41,6 +41,7 @@ message EndOfMessageIndication { optional int32 firstLineByteLength = 1; optional int32 headersByteLength = 2; } +message RequestIntentionallyDropped {} message TrafficObservation { google.protobuf.Timestamp ts = 1; @@ -61,6 +62,8 @@ message TrafficObservation { // having been committed to the stream. EndOfSegmentsIndication segmentEnd = 14; EndOfMessageIndication endOfMessageIndicator = 15; + + RequestIntentionallyDropped requestDropped = 16; } } diff --git a/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpRequestHandler.java b/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpRequestHandler.java index 7b5161ed6..b52658bd1 100644 --- a/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpRequestHandler.java +++ b/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpRequestHandler.java @@ -4,6 +4,7 @@ import io.netty.handler.codec.http.HttpRequest; import io.netty.util.ReferenceCountUtil; import lombok.Lombok; +import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import org.opensearch.migrations.trafficcapture.IChannelConnectionCaptureSerializer; @@ -13,16 +14,18 @@ public class ConditionallyReliableLoggingHttpRequestHandler extends LoggingHttpRequestHandler { private final Predicate shouldBlockPredicate; - public ConditionallyReliableLoggingHttpRequestHandler(IChannelConnectionCaptureSerializer trafficOffloader, - Predicate headerPredicateForWhenToBlock) { - super(trafficOffloader); + public ConditionallyReliableLoggingHttpRequestHandler(@NonNull IChannelConnectionCaptureSerializer trafficOffloader, + @NonNull RequestCapturePredicate requestCapturePredicate, + @NonNull Predicate headerPredicateForWhenToBlock) { + super(trafficOffloader, requestCapturePredicate); this.shouldBlockPredicate = headerPredicateForWhenToBlock; } @Override - protected void channelFinishedReadingAnHttpMessage(ChannelHandlerContext ctx, Object msg, HttpRequest httpRequest) + protected void channelFinishedReadingAnHttpMessage(ChannelHandlerContext ctx, Object msg, + boolean shouldCapture, HttpRequest httpRequest) throws Exception { - if (shouldBlockPredicate.test(httpRequest)) { + if (shouldCapture && shouldBlockPredicate.test(httpRequest)) { trafficOffloader.flushCommitAndResetStream(false).whenComplete((result, t) -> { if (t != null) { // This is a spot where we would benefit from having a behavioral policy that different users @@ -32,14 +35,14 @@ protected void channelFinishedReadingAnHttpMessage(ChannelHandlerContext ctx, Ob ReferenceCountUtil.release(msg); } else { try { - super.channelFinishedReadingAnHttpMessage(ctx, msg, httpRequest); + super.channelFinishedReadingAnHttpMessage(ctx, msg, shouldCapture, httpRequest); } catch (Exception e) { throw Lombok.sneakyThrow(e); } } }); } else { - super.channelFinishedReadingAnHttpMessage(ctx, msg, httpRequest); + super.channelFinishedReadingAnHttpMessage(ctx, msg, shouldCapture, httpRequest); } } } diff --git a/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/HeaderValueFilteringCapturePredicate.java b/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/HeaderValueFilteringCapturePredicate.java new file mode 100644 index 000000000..475e51e55 --- /dev/null +++ b/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/HeaderValueFilteringCapturePredicate.java @@ -0,0 +1,29 @@ +package org.opensearch.migrations.trafficcapture.netty; + + +import io.netty.handler.codec.http.HttpRequest; + +import java.util.Map; +import java.util.Optional; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +public class HeaderValueFilteringCapturePredicate extends RequestCapturePredicate { + private final Map headerToPredicateRegexMap; + + public HeaderValueFilteringCapturePredicate(Map suppressCaptureHeaderPairs) { + super(new PassThruHttpHeaders.HttpHeadersToPreserve(suppressCaptureHeaderPairs.keySet() + .toArray(String[]::new))); + headerToPredicateRegexMap = suppressCaptureHeaderPairs.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, kvp->Pattern.compile(kvp.getValue()))); + } + + @Override + public CaptureDirective apply(HttpRequest request) { + return headerToPredicateRegexMap.entrySet().stream().anyMatch(kvp-> + Optional.ofNullable(request.headers().get(kvp.getKey())) + .map(v->kvp.getValue().matcher(v).matches()) + .orElse(false) + ) ? CaptureDirective.DROP : CaptureDirective.CAPTURE; + } +} diff --git a/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/LoggingHttpRequestHandler.java b/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/LoggingHttpRequestHandler.java index de5b0256e..ca3e5075f 100644 --- a/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/LoggingHttpRequestHandler.java +++ b/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/LoggingHttpRequestHandler.java @@ -15,6 +15,7 @@ import io.netty.handler.codec.http.LastHttpContent; import lombok.Getter; import lombok.Lombok; +import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import org.opensearch.migrations.coreutils.MetricsAttributeKey; import org.opensearch.migrations.coreutils.MetricsEvent; @@ -28,6 +29,12 @@ public class LoggingHttpRequestHandler extends ChannelInboundHandlerAdapter { private static final MetricsLogger metricsLogger = new MetricsLogger("LoggingHttpRequestHandler"); static class SimpleHttpRequestDecoder extends HttpRequestDecoder { + private final PassThruHttpHeaders.HttpHeadersToPreserve headersToPreserve; + + public SimpleHttpRequestDecoder(@NonNull PassThruHttpHeaders.HttpHeadersToPreserve headersToPreserve) { + this.headersToPreserve = headersToPreserve; + } + /** * Override this so that the HttpHeaders object can be a cheaper one. PassThruHeaders * only stores a handful of headers that are required for parsing the payload portion @@ -37,7 +44,7 @@ static class SimpleHttpRequestDecoder extends HttpRequestDecoder { public HttpMessage createMessage(String[] initialLine) throws Exception { return new DefaultHttpRequest(HttpVersion.valueOf(initialLine[2]), HttpMethod.valueOf(initialLine[0]), initialLine[1] - , new PassThruHttpHeaders() + , new PassThruHttpHeaders(headersToPreserve) ); } } @@ -45,11 +52,25 @@ public HttpMessage createMessage(String[] initialLine) throws Exception { static class SimpleDecodedHttpRequestHandler extends ChannelInboundHandlerAdapter { @Getter private HttpRequest currentRequest; + final RequestCapturePredicate requestCapturePredicate; boolean isDone; + boolean shouldCapture; + boolean liveReadObservationsInOffloader; + + SimpleDecodedHttpRequestHandler(RequestCapturePredicate requestCapturePredicate) { + this.requestCapturePredicate = requestCapturePredicate; + this.currentRequest = null; + this.isDone = false; + this.shouldCapture = true; + liveReadObservationsInOffloader = false; + } + @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + public void channelRead(@NonNull ChannelHandlerContext ctx, @NonNull Object msg) throws Exception { if (msg instanceof HttpRequest) { currentRequest = (HttpRequest) msg; + shouldCapture = RequestCapturePredicate.CaptureDirective.CAPTURE == + requestCapturePredicate.apply((HttpRequest) msg); } else if (msg instanceof HttpContent) { ((HttpContent)msg).release(); if (msg instanceof LastHttpContent) { @@ -61,9 +82,11 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception } public HttpRequest resetCurrentRequest() { - isDone = false; + this.shouldCapture = true; + this.isDone = false; var old = currentRequest; - currentRequest = null; + this.currentRequest = null; + this.liveReadObservationsInOffloader = false; return old; } } @@ -71,25 +94,16 @@ public HttpRequest resetCurrentRequest() { protected final IChannelConnectionCaptureSerializer trafficOffloader; protected final EmbeddedChannel httpDecoderChannel; - protected final SimpleHttpRequestDecoder requestDecoder; - - public LoggingHttpRequestHandler(IChannelConnectionCaptureSerializer trafficOffloader) { + public LoggingHttpRequestHandler(IChannelConnectionCaptureSerializer trafficOffloader, + @NonNull RequestCapturePredicate httpHeadersCapturePredicate) { this.trafficOffloader = trafficOffloader; - requestDecoder = new SimpleHttpRequestDecoder(); // as a field for easier debugging httpDecoderChannel = new EmbeddedChannel( - requestDecoder, - new SimpleDecodedHttpRequestHandler() + new SimpleHttpRequestDecoder(httpHeadersCapturePredicate.getHeadersRequiredForMatcher()), + new SimpleDecodedHttpRequestHandler(httpHeadersCapturePredicate) ); } - private HttpProcessedState parseHttpMessageParts(ByteBuf msg) { - httpDecoderChannel.writeInbound(msg); // Consume this outright, up to the caller to know what else to do - return getHandlerThatHoldsParsedHttpRequest().isDone ? - HttpProcessedState.FULL_MESSAGE : - HttpProcessedState.ONGOING; - } - private SimpleDecodedHttpRequestHandler getHandlerThatHoldsParsedHttpRequest() { return (SimpleDecodedHttpRequestHandler) httpDecoderChannel.pipeline().last(); } @@ -126,7 +140,8 @@ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { super.handlerRemoved(ctx); } - protected void channelFinishedReadingAnHttpMessage(ChannelHandlerContext ctx, Object msg, HttpRequest httpRequest) throws Exception { + protected void channelFinishedReadingAnHttpMessage(ChannelHandlerContext ctx, Object msg, boolean shouldCapture, + HttpRequest httpRequest) throws Exception { super.channelRead(ctx, msg); metricsLogger.atSuccess(MetricsEvent.RECEIVED_FULL_HTTP_REQUEST) .setAttribute(MetricsAttributeKey.CHANNEL_ID, ctx.channel().id().asLongText()) @@ -137,25 +152,32 @@ protected void channelFinishedReadingAnHttpMessage(ChannelHandlerContext ctx, Ob @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { var timestamp = Instant.now(); - HttpProcessedState httpProcessedState; - { - var bb = ((ByteBuf) msg).retainedDuplicate(); + var requestParsingHandler = getHandlerThatHoldsParsedHttpRequest(); + var bb = ((ByteBuf) msg); + httpDecoderChannel.writeInbound(bb.retainedDuplicate()); // the ByteBuf is consumed/release by this method + var shouldCapture = requestParsingHandler.shouldCapture; + if (shouldCapture) { + requestParsingHandler.liveReadObservationsInOffloader = true; trafficOffloader.addReadEvent(timestamp, bb); - metricsLogger.atSuccess(MetricsEvent.RECEIVED_REQUEST_COMPONENT) - .setAttribute(MetricsAttributeKey.CHANNEL_ID, ctx.channel().id().asLongText()).emit(); - - httpProcessedState = parseHttpMessageParts(bb); // bb is consumed/release by this method + } else if (requestParsingHandler.liveReadObservationsInOffloader) { + trafficOffloader.cancelCaptureForCurrentRequest(timestamp); + requestParsingHandler.liveReadObservationsInOffloader = false; } - if (httpProcessedState == HttpProcessedState.FULL_MESSAGE) { - var httpRequest = getHandlerThatHoldsParsedHttpRequest().resetCurrentRequest(); - var decoderResultLoose = httpRequest.decoderResult(); - if (decoderResultLoose instanceof HttpMessageDecoderResult) { - var decoderResult = (HttpMessageDecoderResult) decoderResultLoose; - trafficOffloader.addEndOfFirstLineIndicator(decoderResult.initialLineLength()); - trafficOffloader.addEndOfHeadersIndicator(decoderResult.headerSize()); + metricsLogger.atSuccess(MetricsEvent.RECEIVED_REQUEST_COMPONENT) + .setAttribute(MetricsAttributeKey.CHANNEL_ID, ctx.channel().id().asLongText()).emit(); + + if (requestParsingHandler.isDone) { + var httpRequest = requestParsingHandler.resetCurrentRequest(); + if (shouldCapture) { + var decoderResultLoose = httpRequest.decoderResult(); + if (decoderResultLoose instanceof HttpMessageDecoderResult) { + var decoderResult = (HttpMessageDecoderResult) decoderResultLoose; + trafficOffloader.addEndOfFirstLineIndicator(decoderResult.initialLineLength()); + trafficOffloader.addEndOfHeadersIndicator(decoderResult.headerSize()); + } + trafficOffloader.commitEndOfHttpMessageIndicator(timestamp); } - trafficOffloader.commitEndOfHttpMessageIndicator(timestamp); - channelFinishedReadingAnHttpMessage(ctx, msg, httpRequest); + channelFinishedReadingAnHttpMessage(ctx, msg, shouldCapture, httpRequest); } else { super.channelRead(ctx, msg); } diff --git a/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/PassThruHttpHeaders.java b/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/PassThruHttpHeaders.java index b238cfc5f..33953b55c 100644 --- a/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/PassThruHttpHeaders.java +++ b/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/PassThruHttpHeaders.java @@ -3,21 +3,37 @@ import io.netty.handler.codec.http.DefaultHttpHeaders; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaders; +import lombok.NonNull; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Stream; public class PassThruHttpHeaders extends DefaultHttpHeaders { - private static final DefaultHttpHeaders HEADERS_TO_PRESERVE = makeHeadersToPreserve(); + /** + * Use the HttpHeaders class because it does case insensitive matches. + */ + private final HttpHeaders mapWithCaseInsensitiveHeaders; + + public static class HttpHeadersToPreserve { + private final HttpHeaders caseInsensitiveHeadersMap; + public HttpHeadersToPreserve(String... extraHeaderNames) { + caseInsensitiveHeadersMap = new DefaultHttpHeaders(); + Stream.concat(Stream.of(HttpHeaderNames.CONTENT_LENGTH.toString(), + HttpHeaderNames.CONTENT_TRANSFER_ENCODING.toString(), + HttpHeaderNames.TRAILER.toString()), + Arrays.stream(extraHeaderNames)) + .forEach(h->caseInsensitiveHeadersMap.add(h, "")); + } + } - private static DefaultHttpHeaders makeHeadersToPreserve() { - var h = new DefaultHttpHeaders(false); - h.add(HttpHeaderNames.CONTENT_LENGTH, ""); - h.add(HttpHeaderNames.CONTENT_TRANSFER_ENCODING, ""); - h.add(HttpHeaderNames.TRAILER, ""); - return h; + public PassThruHttpHeaders(@NonNull HttpHeadersToPreserve headersToPreserve) { + this.mapWithCaseInsensitiveHeaders = headersToPreserve.caseInsensitiveHeadersMap; } - private static boolean headerNameShouldBeTracked(CharSequence name) { - return HEADERS_TO_PRESERVE.contains(name); + private boolean headerNameShouldBeTracked(CharSequence name) { + return mapWithCaseInsensitiveHeaders.contains(name); } @Override diff --git a/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/RequestCapturePredicate.java b/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/RequestCapturePredicate.java new file mode 100644 index 000000000..45d4241fa --- /dev/null +++ b/TrafficCapture/nettyWireLogging/src/main/java/org/opensearch/migrations/trafficcapture/netty/RequestCapturePredicate.java @@ -0,0 +1,30 @@ +package org.opensearch.migrations.trafficcapture.netty; + +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpRequest; +import lombok.Getter; + +import java.util.function.Function; + +public class RequestCapturePredicate implements Function { + + public enum CaptureDirective { + CAPTURE, DROP + } + + @Getter + protected final PassThruHttpHeaders.HttpHeadersToPreserve headersRequiredForMatcher; + + public RequestCapturePredicate() { + this(new PassThruHttpHeaders.HttpHeadersToPreserve()); + } + + public RequestCapturePredicate(PassThruHttpHeaders.HttpHeadersToPreserve incoming) { + this.headersRequiredForMatcher = incoming; + } + + @Override + public CaptureDirective apply(HttpRequest request) { + return CaptureDirective.CAPTURE; + } +} diff --git a/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpRequestHandlerTest.java b/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpRequestHandlerTest.java index 4879ee983..b0e87c6db 100644 --- a/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpRequestHandlerTest.java +++ b/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpRequestHandlerTest.java @@ -2,6 +2,7 @@ import com.google.protobuf.CodedOutputStream; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.embedded.EmbeddedChannel; import lombok.AllArgsConstructor; import lombok.SneakyThrows; @@ -18,14 +19,16 @@ import org.opensearch.migrations.trafficcapture.protos.TrafficStream; import java.io.ByteArrayInputStream; -import java.io.FileOutputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.SequenceInputStream; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collections; +import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -34,16 +37,21 @@ @Slf4j public class ConditionallyReliableLoggingHttpRequestHandlerTest { - @AllArgsConstructor - static class StreamManager extends OrderedStreamLifecyleManager { - AtomicReference byteBufferAtomicReference; + static class TestStreamManager extends OrderedStreamLifecyleManager implements AutoCloseable { + AtomicReference byteBufferAtomicReference = new AtomicReference<>(); AtomicInteger flushCount = new AtomicInteger(); + ByteArrayOutputStream collectedSerializedTrafficStream = new ByteArrayOutputStream(); @Override public CodedOutputStreamAndByteBufferWrapper createStream() { return new CodedOutputStreamAndByteBufferWrapper(1024*1024); } + @Override + public void close() throws Exception { + collectedSerializedTrafficStream.close(); + } + @SneakyThrows @Override public CompletableFuture @@ -58,6 +66,7 @@ public CodedOutputStreamAndByteBufferWrapper createStream() { cos.flush(); byteBufferAtomicReference.set(osh.getByteBuffer().flip().asReadOnlyBuffer()); log.trace("byteBufferAtomicReference.get="+byteBufferAtomicReference.get()); + //collectedSerializedTrafficStream.write(byteBufferAtomicReference.get().array()); return CompletableFuture.completedFuture(flushCount.incrementAndGet()); } @@ -66,13 +75,11 @@ public CodedOutputStreamAndByteBufferWrapper createStream() { private static void writeMessageAndVerify(byte[] fullTrafficBytes, Consumer channelWriter) throws IOException { - AtomicReference outputByteBuffer = new AtomicReference<>(); - AtomicInteger flushCount = new AtomicInteger(); - var offloader = new StreamChannelConnectionCaptureSerializer("Test", "connection", - new StreamManager(outputByteBuffer, flushCount)); + var streamManager = new TestStreamManager(); + var offloader = new StreamChannelConnectionCaptureSerializer("Test", "c", streamManager); EmbeddedChannel channel = new EmbeddedChannel( - new ConditionallyReliableLoggingHttpRequestHandler(offloader, x->true)); // true: block every request + new ConditionallyReliableLoggingHttpRequestHandler(offloader, new RequestCapturePredicate(), x->true)); // true: block every request channelWriter.accept(channel); // we wrote the correct data to the downstream handler/channel @@ -82,10 +89,10 @@ private static void writeMessageAndVerify(byte[] fullTrafficBytes, Consumer 0 && trafficStream.getSubStream(0).hasRead()); var combinedTrafficPacketsSteam = @@ -94,7 +101,7 @@ private static void writeMessageAndVerify(byte[] fullTrafficBytes, Consumernew ByteArrayInputStream(to.getRead().getData().toByteArray())) .collect(Collectors.toList()))); Assertions.assertArrayEquals(fullTrafficBytes, combinedTrafficPacketsSteam.readAllBytes()); - Assertions.assertEquals(1, flushCount.get()); + Assertions.assertEquals(1, streamManager.flushCount.get()); } private static byte[] consumeIntoArray(ByteBuf m) { @@ -109,9 +116,7 @@ private static byte[] consumeIntoArray(ByteBuf m) { public void testThatAPostInASinglePacketBlocksFutureActivity(boolean usePool) throws IOException { byte[] fullTrafficBytes = SimpleRequests.SMALL_POST.getBytes(StandardCharsets.UTF_8); var bb = TestUtilities.getByteBuf(fullTrafficBytes, usePool); - writeMessageAndVerify(fullTrafficBytes, w -> { - w.writeInbound(bb); - }); + writeMessageAndVerify(fullTrafficBytes, w -> w.writeInbound(bb)); log.info("buf.refCnt="+bb.refCnt()); } @@ -119,12 +124,81 @@ public void testThatAPostInASinglePacketBlocksFutureActivity(boolean usePool) th @ValueSource(booleans = {true, false}) public void testThatAPostInTinyPacketsBlocksFutureActivity(boolean usePool) throws IOException { byte[] fullTrafficBytes = SimpleRequests.SMALL_POST.getBytes(StandardCharsets.UTF_8); - writeMessageAndVerify(fullTrafficBytes, w -> { - for (int i=0; i getSingleByteAtATimeWriter(boolean usePool, byte[] fullTrafficBytes) { + return w -> { + for (int i = 0; i< fullTrafficBytes.length; ++i) { var singleByte = TestUtilities.getByteBuf(Arrays.copyOfRange(fullTrafficBytes, i, i+1), usePool); w.writeInbound(singleByte); } - }); + }; + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testThatHealthCheckCaptureCanBeSuppressed(boolean singleBytes) throws Exception { + var streamMgr = new TestStreamManager(); + var offloader = new StreamChannelConnectionCaptureSerializer("Test", "connection", streamMgr); + + var headerCapturePredicate = new HeaderValueFilteringCapturePredicate(Map.of("user-Agent", ".*uploader.*")); + EmbeddedChannel channel = new EmbeddedChannel( + new ConditionallyReliableLoggingHttpRequestHandler(offloader, headerCapturePredicate, x->true)); + getWriter(singleBytes, true, SimpleRequests.HEALTH_CHECK.getBytes(StandardCharsets.UTF_8)).accept(channel); + getWriter(singleBytes, true, SimpleRequests.SMALL_POST.getBytes(StandardCharsets.UTF_8)).accept(channel); + var requestBytes = (SimpleRequests.HEALTH_CHECK + SimpleRequests.SMALL_POST).getBytes(StandardCharsets.UTF_8); + + // we wrote the correct data to the downstream handler/channel + var outputData = new SequenceInputStream(Collections.enumeration(channel.inboundMessages().stream() + .map(m->new ByteArrayInputStream(consumeIntoArray((ByteBuf)m))) + .collect(Collectors.toList()))) + .readAllBytes(); + log.info("outputdata = " + new String(outputData, StandardCharsets.UTF_8)); + Assertions.assertArrayEquals(requestBytes, outputData); + + Assertions.assertNotNull(streamMgr.byteBufferAtomicReference, + "This would be null if the handler didn't block until the output was written"); + // we wrote the correct data to the offloaded stream + var trafficStream = TrafficStream.parseFrom(streamMgr.byteBufferAtomicReference.get()); + Assertions.assertTrue(trafficStream.getSubStreamCount() > 0 && + trafficStream.getSubStream(0).hasRead()); + Assertions.assertEquals(1, streamMgr.flushCount.get()); + var observations = trafficStream.getSubStreamList(); + if (singleBytes) { + var sawRequestDropped = new AtomicBoolean(false); + var observationsAfterDrop = observations.stream().dropWhile(o->{ + var wasDrop = o.hasRequestDropped(); + sawRequestDropped.compareAndSet(false, wasDrop); + return !sawRequestDropped.get() || wasDrop; + }).collect(Collectors.toList()); + var combinedTrafficPacketsSteam = + new SequenceInputStream(Collections.enumeration(observationsAfterDrop.stream() + .filter(to->to.hasRead()) + .map(to->new ByteArrayInputStream(to.getRead().getData().toByteArray())) + .collect(Collectors.toList()))); + Assertions.assertArrayEquals(SimpleRequests.SMALL_POST.getBytes(StandardCharsets.UTF_8), + combinedTrafficPacketsSteam.readAllBytes()); + } else { + var combinedTrafficPacketsSteam = + new SequenceInputStream(Collections.enumeration(observations.stream() + .filter(to->to.hasRead()) + .map(to->new ByteArrayInputStream(to.getRead().getData().toByteArray())) + .collect(Collectors.toList()))); + var reconstitutedTrafficStreamReads = combinedTrafficPacketsSteam.readAllBytes(); + log.info("reconstitutedTrafficStreamReads="+ + new String(reconstitutedTrafficStreamReads, StandardCharsets.UTF_8)); + Assertions.assertArrayEquals(SimpleRequests.SMALL_POST.getBytes(StandardCharsets.UTF_8), + reconstitutedTrafficStreamReads); + } + } + + private Consumer getWriter(boolean singleBytes, boolean usePool, byte[] bytes) { + if (singleBytes) { + return getSingleByteAtATimeWriter(usePool, bytes); + } else { + return w -> w.writeInbound(Unpooled.wrappedBuffer(bytes)); + } } @ParameterizedTest diff --git a/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/SimpleRequests.java b/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/SimpleRequests.java index d1c1d32d2..fd831f1ac 100644 --- a/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/SimpleRequests.java +++ b/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/SimpleRequests.java @@ -1,10 +1,17 @@ package org.opensearch.migrations.trafficcapture.netty; public class SimpleRequests { - public static String SMALL_POST = "POST / HTTP/1.1\n" + - "Host: localhost\n" + - "Content-Type: application/x-www-form-urlencoded\n" + - "Content-Length: 27\n" + - "\n" + + public static String SMALL_POST = "POST / HTTP/1.1\r\n" + + "Host: localhost\r\n" + + "Content-Type: application/x-www-form-urlencoded\r\n" + + "Content-Length: 16\r\n" + + "\r\n" + + "FAKE_UPLOAD_DATA"; + + public static String HEALTH_CHECK = "POST / HTTP/1.1\r\n" + + "Host: localhost\r\n" + + "User-Agent: uploader\r\n" + + "Content-Length: 27\r\n" + + "\r\n" + "field1=value1&field2=value2"; } diff --git a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/CaptureProxy.java b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/CaptureProxy.java index 504cdf275..89f7b7cb1 100644 --- a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/CaptureProxy.java +++ b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/CaptureProxy.java @@ -23,6 +23,7 @@ import org.opensearch.migrations.trafficcapture.StreamChannelConnectionCaptureSerializer; import org.opensearch.migrations.trafficcapture.StreamLifecycleManager; import org.opensearch.migrations.trafficcapture.kafkaoffloader.KafkaCaptureFactory; +import org.opensearch.migrations.trafficcapture.netty.HeaderValueFilteringCapturePredicate; import org.opensearch.migrations.trafficcapture.proxyserver.netty.BacksideConnectionPool; import org.opensearch.migrations.trafficcapture.proxyserver.netty.NettyScanningHttpProxy; import org.opensearch.security.ssl.DefaultSecurityKeyStore; @@ -36,8 +37,12 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Properties; +import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; @@ -132,6 +137,13 @@ static class Parameters { description = "Endpoint (host:port) for the OpenTelemetry Collector to which metrics logs should be forwarded." + "If this is not provided, metrics will not be sent to a collector.") String otelCollectorEndpoint; + @Parameter(required = false, + names = "--suppressCaptureForHeaderMatch", + arity = 2, + description = "The header name (which will be interpreted in a case-insensitive manner) and a regex " + + "pattern. When the incoming request has a header that matches the regex, it will be passed " + + "through to the service but will NOT be captured. E.g. user-agent 'healthcheck'.") + private List suppressCaptureHeaderPairs = new ArrayList<>(); } static Parameters parseArgs(String[] args) { @@ -274,6 +286,14 @@ private static SslContext loadBacksideSslContext(URI serverUri, boolean allowIns } } + private static Map convertPairListToMap(List list) { + var map = new TreeMap(); + for (int i=0; inew DefaultSecurityKeyStore(getSettings(sslConfigFile), Paths.get(sslConfigFile).toAbsolutePath().getParent())); - sksOp.ifPresent(x->x.initHttpSSLConfig()); + sksOp.ifPresent(DefaultSecurityKeyStore::initHttpSSLConfig); var proxy = new NettyScanningHttpProxy(params.frontsidePort); try { var pooledConnectionTimeout = params.destinationConnectionPoolSize == 0 ? Duration.ZERO : @@ -297,14 +317,17 @@ public static void main(String[] args) throws InterruptedException, IOException var backsideConnectionPool = new BacksideConnectionPool(backsideUri, loadBacksideSslContext(backsideUri, params.allowInsecureConnectionsToBackside), params.destinationConnectionPoolSize, pooledConnectionTimeout); - proxy.start(backsideConnectionPool, params.numThreads, - sksOp.map(sks -> (Supplier) () -> { - try { - return sks.createHTTPSSLEngine(); - } catch (Exception e) { - throw Lombok.sneakyThrow(e); - } - }).orElse(null), getConnectionCaptureFactory(params)); + Supplier sslEngineSupplier = sksOp.map(sks -> (Supplier) () -> { + try { + return sks.createHTTPSSLEngine(); + } catch (Exception e) { + throw Lombok.sneakyThrow(e); + } + }).orElse(null); + var headerCapturePredicate = + new HeaderValueFilteringCapturePredicate(convertPairListToMap(params.suppressCaptureHeaderPairs)); + proxy.start(backsideConnectionPool, params.numThreads, sslEngineSupplier, + getConnectionCaptureFactory(params), headerCapturePredicate); } catch (Exception e) { log.atError().setCause(e).setMessage("Caught exception while setting up the server and rethrowing").log(); throw e; diff --git a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/NettyScanningHttpProxy.java b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/NettyScanningHttpProxy.java index 4ad2eaf38..e6ed7e28e 100644 --- a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/NettyScanningHttpProxy.java +++ b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/NettyScanningHttpProxy.java @@ -7,7 +7,9 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.util.concurrent.DefaultThreadFactory; +import lombok.NonNull; import org.opensearch.migrations.trafficcapture.IConnectionCaptureFactory; +import org.opensearch.migrations.trafficcapture.netty.RequestCapturePredicate; import javax.net.ssl.SSLEngine; import java.util.function.Supplier; @@ -29,7 +31,8 @@ public int getProxyPort() { public void start(BacksideConnectionPool backsideConnectionPool, int numThreads, Supplier sslEngineSupplier, - IConnectionCaptureFactory connectionCaptureFactory) throws InterruptedException { + IConnectionCaptureFactory connectionCaptureFactory, + @NonNull RequestCapturePredicate requestCapturePredicate) throws InterruptedException { bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("captureProxyPoolBoss")); workerGroup = new NioEventLoopGroup(numThreads, new DefaultThreadFactory("captureProxyPoolWorker")); ServerBootstrap serverBootstrap = new ServerBootstrap(); @@ -37,7 +40,7 @@ public void start(BacksideConnectionPool backsideConnectionPool, mainChannel = serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ProxyChannelInitializer<>(backsideConnectionPool, sslEngineSupplier, - connectionCaptureFactory)) + connectionCaptureFactory, requestCapturePredicate)) .childOption(ChannelOption.AUTO_READ, false) .bind(proxyPort).sync().channel(); } catch (Exception e) { diff --git a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/ProxyChannelInitializer.java b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/ProxyChannelInitializer.java index 3c8444955..37cc20ce5 100644 --- a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/ProxyChannelInitializer.java +++ b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/ProxyChannelInitializer.java @@ -5,8 +5,10 @@ import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.ssl.SslHandler; +import lombok.NonNull; import org.opensearch.migrations.trafficcapture.IConnectionCaptureFactory; import org.opensearch.migrations.trafficcapture.netty.ConditionallyReliableLoggingHttpRequestHandler; +import org.opensearch.migrations.trafficcapture.netty.RequestCapturePredicate; import org.opensearch.migrations.trafficcapture.netty.LoggingHttpResponseHandler; import javax.net.ssl.SSLEngine; @@ -18,12 +20,15 @@ public class ProxyChannelInitializer extends ChannelInitializer connectionCaptureFactory; private final Supplier sslEngineProvider; private final BacksideConnectionPool backsideConnectionPool; + private final RequestCapturePredicate requestCapturePredicate; public ProxyChannelInitializer(BacksideConnectionPool backsideConnectionPool, Supplier sslEngineSupplier, - IConnectionCaptureFactory connectionCaptureFactory) { + IConnectionCaptureFactory connectionCaptureFactory, + @NonNull RequestCapturePredicate requestCapturePredicate) { this.backsideConnectionPool = backsideConnectionPool; this.sslEngineProvider = sslEngineSupplier; this.connectionCaptureFactory = connectionCaptureFactory; + this.requestCapturePredicate = requestCapturePredicate; } public boolean shouldGuaranteeMessageOffloading(HttpRequest httpRequest) { @@ -44,7 +49,7 @@ protected void initChannel(SocketChannel ch) throws IOException { var offloader = connectionCaptureFactory.createOffloader(ch.id().asLongText()); ch.pipeline().addLast(new LoggingHttpResponseHandler(offloader)); ch.pipeline().addLast(new ConditionallyReliableLoggingHttpRequestHandler(offloader, - this::shouldGuaranteeMessageOffloading)); + requestCapturePredicate, this::shouldGuaranteeMessageOffloading)); ch.pipeline().addLast(new FrontsideHandler(backsideConnectionPool)); } } diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/NettyScanningHttpProxyTest.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/NettyScanningHttpProxyTest.java index 009e8f744..c405fafe6 100644 --- a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/NettyScanningHttpProxyTest.java +++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/NettyScanningHttpProxyTest.java @@ -12,6 +12,7 @@ import org.opensearch.migrations.testutils.SimpleHttpServer; import org.opensearch.migrations.trafficcapture.IConnectionCaptureFactory; import org.opensearch.migrations.trafficcapture.InMemoryConnectionCaptureFactory; +import org.opensearch.migrations.trafficcapture.netty.RequestCapturePredicate; import org.opensearch.migrations.trafficcapture.protos.TrafficStream; import java.io.ByteArrayOutputStream; @@ -197,7 +198,8 @@ private static String makeTestRequestViaClient(SimpleHttpClientForTesting client try { var connectionPool = new BacksideConnectionPool(testServerUri, null, 10, Duration.ofSeconds(10)); - nshp.get().start(connectionPool, 1, null, connectionCaptureFactory); + nshp.get().start(connectionPool, 1, null, connectionCaptureFactory, + new RequestCapturePredicate()); System.out.println("proxy port = " + port); } catch (InterruptedException e) { Thread.currentThread().interrupt(); From 7c84f94ab99787f55366558c02b52c67a0009ebe Mon Sep 17 00:00:00 2001 From: Greg Schohn Date: Wed, 6 Dec 2023 09:40:07 -0500 Subject: [PATCH 2/6] Add the ability to generate streams that include the newly added RequestIntentionallyDropped observation. Along the way, I also fixed some issues with tests. One was a bug within the test (RawPackets weren't being properly compared) and the other was a more serious issue that could have resulted in over-committing messages before they were completely handled. The wrong commit came about because the Accumulation's RequestResponsePacketPair ('RRPair') needs a TrafficStreamKey for some maintenance (like closing the stream). Since that RRPair also tracks which TrafficStreamKeys should be committed once the message has been fully handled, there was an optimization to just use the first traffic stream being held as the key for the maintenance. To do that, the TrafficStreamKey was passed into the RRPair constructor, which immediately added it to the trafficStreamsBeingHeld list. While TrafficStreamKeys are routinely added to the RRPairs, they're otherwise only added AFTER we've done all of the processing for all of the TrafficStream's observations. That behavior is what protects the completely messages passed to the onFullDataReceived callback from advertising that they have ownership (holding) of the TrafficStreamKey that may still contain observations for additional, yet to be processed, requests. Notice that callbacks' awareness of which TrafficStreams may often be more loose than what was required. However, close observations and expirations will cause the connection's last observed TrafficStreamKeys to be committed. Therefore, to preserve 'at-least-once' guaranteed delivery, the RRPair class no longer appends the constructor's incoming key to the trafficStreamKeysBeingHeld list and instead tracks it as a separate field (and concern). Signed-off-by: Greg Schohn --- .../replay/RequestResponsePacketPair.java | 8 ++--- .../replay/datatypes/RawPackets.java | 17 +++++++++- ...afficToHttpTransactionAccumulatorTest.java | 7 ++++ .../replay/TrafficStreamGenerator.java | 33 +++++++++++++------ 4 files changed, 50 insertions(+), 15 deletions(-) diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/RequestResponsePacketPair.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/RequestResponsePacketPair.java index 9a2f085fe..80b424748 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/RequestResponsePacketPair.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/RequestResponsePacketPair.java @@ -23,17 +23,17 @@ public enum ReconstructionStatus { HttpMessageAndTimestamp requestData; HttpMessageAndTimestamp responseData; + final ITrafficStreamKey trafficStreamKey; List trafficStreamKeysBeingHeld; ReconstructionStatus completionStatus; - public RequestResponsePacketPair(ITrafficStreamKey startingAtTrafficStreamKey) { + public RequestResponsePacketPair(@NonNull ITrafficStreamKey startingAtTrafficStreamKey) { this.trafficStreamKeysBeingHeld = new ArrayList<>(); - this.trafficStreamKeysBeingHeld.add(startingAtTrafficStreamKey); + trafficStreamKey = startingAtTrafficStreamKey; } @NonNull ITrafficStreamKey getBeginningTrafficStreamKey() { - assert trafficStreamKeysBeingHeld != null && !trafficStreamKeysBeingHeld.isEmpty(); - return trafficStreamKeysBeingHeld.get(0); + return trafficStreamKey; } public void addRequestData(Instant packetTimeStamp, byte[] data) { diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/RawPackets.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/RawPackets.java index e703fd6e3..e9e82b9ee 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/RawPackets.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/RawPackets.java @@ -3,7 +3,22 @@ import lombok.EqualsAndHashCode; import java.util.ArrayList; +import java.util.Arrays; + -@EqualsAndHashCode(callSuper = true) public class RawPackets extends ArrayList { + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof RawPackets)) { return false; } + RawPackets that = (RawPackets) o; + if (size() != that.size()) { return false; } + + for (int i = 0; i < size(); i++) { + if (!Arrays.equals(get(i), that.get(i))) { + return false; + } + } + return true; + } } diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/SimpleCapturedTrafficToHttpTransactionAccumulatorTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/SimpleCapturedTrafficToHttpTransactionAccumulatorTest.java index 53b97d41c..669d6c5ba 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/SimpleCapturedTrafficToHttpTransactionAccumulatorTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/SimpleCapturedTrafficToHttpTransactionAccumulatorTest.java @@ -55,6 +55,7 @@ public class SimpleCapturedTrafficToHttpTransactionAccumulatorTest { enum OffloaderCommandType { Read, EndOfMessage, + DropRequest, Write, Flush } @@ -70,6 +71,9 @@ public static ObservationDirective read(int i) { public static ObservationDirective eom() { return new ObservationDirective(OffloaderCommandType.EndOfMessage, 0); } + public static ObservationDirective cancelOffload() { + return new ObservationDirective(OffloaderCommandType.DropRequest, 0); + } public static ObservationDirective write(int i) { return new ObservationDirective(OffloaderCommandType.Write, i); } @@ -119,6 +123,9 @@ private static void serializeEvent(IChannelConnectionCaptureSerializer offloader case Flush: offloader.flushCommitAndResetStream(false); return; + case DropRequest: + offloader.cancelCaptureForCurrentRequest(Instant.EPOCH); + return; default: throw new IllegalStateException("Unknown directive type: " + directive.offloaderCommandType); } diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/TrafficStreamGenerator.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/TrafficStreamGenerator.java index 4709b2f2c..67b49ebbf 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/TrafficStreamGenerator.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/TrafficStreamGenerator.java @@ -1,6 +1,5 @@ package org.opensearch.migrations.replay; -import io.vavr.Tuple2; import lombok.AllArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -33,9 +32,12 @@ public class TrafficStreamGenerator { public static final int MAX_READS_IN_REQUEST = 5; public static final int MAX_WRITES_IN_RESPONSE = 5; public static final List RANDOM_GENERATOR_SEEDS_FOR_SUFFICIENT_TRAFFIC_VARIANCE = List.of( - -1155869325, 892128508, 155629808, 1429008869, -1465154083, -1242363800, 26273138, - 1705850753, -1956122223, -193570837, 1558626465, 1248685248, -1292756720, -3507139, 929459541, - 474550272, -957816454, -1418261474, 431108934, 1601212083, 1788602357, 1722788072, 1421653156); + -1155869325, 892128508, 155629808, 1429008869, -138487339, 26273138, 685382526, + -226796111, -270230103, 1705850753, -1978864692, -836540342, 1181244667,-193570837, + -1617640095, -1359243304, -1973979577, -67333094, -2129721489, -2114584541, -1121163101, + 866345822, 1302951949, 30280569, -1199907813, 34574822, 2109003795, -1349584475, -2050877083, + 160359681, -1345969040, -20026097, 793184536, 834861033 + ); public enum ObservationType { Read(0), @@ -45,7 +47,8 @@ public enum ObservationType { Write(4), WriteSegment(5), EndOfWriteSegment(6), - Close(7); + RequestDropped(7), + Close(8); private final int intValue; @@ -72,7 +75,7 @@ public static Stream valueStream() { * a bit more work to fully implement. For now, the count parameter is ignored. */ private static int makeClassificationValue(ObservationType ot1, ObservationType ot2, Integer count) { - return (((0 * CLASSIFY_COMPONENT_INT_SHIFT) + ot1.intValue) * CLASSIFY_COMPONENT_INT_SHIFT) + ot2.intValue; + return ((ot1.intValue) * CLASSIFY_COMPONENT_INT_SHIFT) + ot2.intValue; } static String classificationToString(int c) { @@ -100,6 +103,8 @@ private static Optional getTypeFromObservation(TrafficObservati return Optional.empty(); } else if (trafficObservation.hasClose()) { return Optional.of(ObservationType.Close); + } else if (trafficObservation.hasRequestDropped()) { + return Optional.of(ObservationType.RequestDropped); } else { throw new IllegalStateException("unknown traffic observation: " + trafficObservation); } @@ -117,6 +122,8 @@ private static ObservationType getTypeFromObservation(TrafficObservation traffic case Write: case WriteSegment: return ObservationType.EndOfWriteSegment; + case RequestDropped: + return ObservationType.RequestDropped; default: throw new IllegalStateException("previous observation type doesn't match expected possibilities: " + lastObservationType); @@ -174,13 +181,17 @@ private static T supplyRandomly(Random r, double p1, Supplier supplier1, return (r.nextDouble() <= p1) ? supplier1.get() : supplier2.get(); } - private static void fillCommandsAndSizes(int bufferSize, Random r, double flushLikelihood, int bufferBound, + private static void fillCommandsAndSizes(Random r, double cancelRequestLikelihood, double flushLikelihood, + int bufferBound, List commands, List sizes) { var numTransactions = r.nextInt(MAX_COMMANDS_IN_CONNECTION); for (int i=numTransactions; i>0; --i) { addCommands(r, flushLikelihood, r.nextInt(MAX_READS_IN_REQUEST)+1, commands, sizes, ()-> SimpleCapturedTrafficToHttpTransactionAccumulatorTest.ObservationDirective.read(r.nextInt(bufferBound))); + if (r.nextDouble() <= cancelRequestLikelihood) { + commands.add(SimpleCapturedTrafficToHttpTransactionAccumulatorTest.ObservationDirective.cancelOffload()); + } if (r.nextDouble() <= flushLikelihood) { commands.add(SimpleCapturedTrafficToHttpTransactionAccumulatorTest.ObservationDirective.flush()); } @@ -204,7 +215,8 @@ private static TrafficStream[] fillCommandsAndSizesForSeed(long rSeed, .setMessage(()->String.format("bufferSize=%d bufferBound=%d maxPossibleReads=%d maxPossibleWrites=%d", bufferSize, bufferBound, MAX_READS_IN_REQUEST, MAX_WRITES_IN_RESPONSE)) .log(); - fillCommandsAndSizes(bufferSize, r2, Math.pow(r2.nextDouble(),2.0), bufferBound, commands, sizes); + var flushLikelihood = Math.pow(r2.nextDouble(),2.0); + fillCommandsAndSizes(r2, flushLikelihood/4, flushLikelihood, bufferBound, commands, sizes); return SimpleCapturedTrafficToHttpTransactionAccumulatorTest.makeTrafficStreams(bufferSize, (int) rSeed, commands); } @@ -222,7 +234,8 @@ static HashSet getPossibleTests() { ObservationType.EOM, List.of(ObservationType.EndOfReadSegment, ObservationType.EndOfWriteSegment), ObservationType.Write, List.of(ObservationType.EndOfReadSegment, ObservationType.EndOfWriteSegment), ObservationType.WriteSegment, List.of(ObservationType.EndOfReadSegment), - ObservationType.EndOfWriteSegment, List.of(ObservationType.EndOfReadSegment)) + ObservationType.EndOfWriteSegment, List.of(ObservationType.EndOfReadSegment), + ObservationType.RequestDropped, List.of(ObservationType.EndOfReadSegment, ObservationType.EndOfWriteSegment)) .entrySet().stream() .flatMap(kvp->kvp.getValue().stream().map(v->makeClassificationValue(kvp.getKey(), v,0))) .collect(Collectors.toSet()); @@ -230,7 +243,7 @@ static HashSet getPossibleTests() { ObservationType.valueStream() .flatMap(ot1->ObservationType.valueStream().map(ot2->makeClassificationValue(ot1, ot2, 0))) .filter(i->!impossibleTransitions.contains(i)) - .forEach(i->possibilities.add(i)); + .forEach(possibilities::add); return possibilities; } From 394cd5d091bd3edad34c909af76b7fb936f52fa9 Mon Sep 17 00:00:00 2001 From: Greg Schohn Date: Wed, 6 Dec 2023 13:23:17 -0500 Subject: [PATCH 3/6] Stop emitting a spurious log message that was printed for every connection when it was closed. The message began "Work items are still remaining for this connection..." because the close() task that was running was remaining and that alone could trigger the log message. Now the type of task is passed around with the task itself so that we can do a smarter check. Notice that there may be other work scheduled for the connection/eventLoop that we aren't able to track in this class right now. See https://opensearch.atlassian.net/browse/MIGRATIONS-1447. Signed-off-by: Greg Schohn --- .../replay/RequestSenderOrchestrator.java | 63 +++++++++++-------- .../replay/datatypes/ChannelTask.java | 11 ++++ .../replay/datatypes/ChannelTaskType.java | 5 ++ .../datatypes/ConnectionReplaySession.java | 2 +- .../TimeToResponseFulfillmentFutureMap.java | 21 +++++-- ...imeToResponseFulfillmentFutureMapTest.java | 10 +-- 6 files changed, 75 insertions(+), 37 deletions(-) create mode 100644 TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/ChannelTask.java create mode 100644 TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/ChannelTaskType.java diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/RequestSenderOrchestrator.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/RequestSenderOrchestrator.java index df6851652..1e1cd0935 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/RequestSenderOrchestrator.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/RequestSenderOrchestrator.java @@ -5,9 +5,11 @@ import io.netty.channel.EventLoop; import lombok.extern.slf4j.Slf4j; import org.opensearch.migrations.replay.datahandlers.NettyPacketToHttpConsumer; +import org.opensearch.migrations.replay.datatypes.ChannelTaskType; import org.opensearch.migrations.replay.datatypes.ConnectionReplaySession; import org.opensearch.migrations.replay.datatypes.ISourceTrafficChannelKey; import org.opensearch.migrations.replay.datatypes.IndexedChannelInteraction; +import org.opensearch.migrations.replay.datatypes.ChannelTask; import org.opensearch.migrations.replay.datatypes.UniqueReplayerRequestKey; import org.opensearch.migrations.replay.util.DiagnosticTrackableCompletableFuture; import org.opensearch.migrations.replay.util.StringTrackableCompletableFuture; @@ -41,6 +43,11 @@ public RequestSenderOrchestrator(ClientConnectionPool clientConnectionPool) { new StringTrackableCompletableFuture(new CompletableFuture<>(), ()->"waiting for final signal to confirm processing work has finished"); log.atDebug().setMessage(()->"Scheduling work for "+channelKey+" at time "+timestamp).log(); + // this method doesn't use the scheduling that scheduleRequest and scheduleClose use because + // doing work associated with a connection is considered to be preprocessing work independent + // of the underlying network connection itself, so it's fair to be able to do this without + // first needing to wait for a connection to succeed. In fact, making them more independent + // means that the work item being enqueued is less likely to cause a connection timeout. connectionSession.eventLoop.schedule(()-> task.get().map(f->f.whenComplete((v,t) -> { if (t!=null) { @@ -78,12 +85,12 @@ public StringTrackableCompletableFuture scheduleClose(ISourceTrafficChanne finalTunneledResponse, channelFutureAndRequestSchedule-> scheduleOnConnectionReplaySession(channelKey, channelInteractionNum, channelFutureAndRequestSchedule, - finalTunneledResponse, timestamp, "close", () -> { + finalTunneledResponse, timestamp, + new ChannelTask(ChannelTaskType.CLOSE, () -> { log.trace("Closing client connection " + channelInteraction); clientConnectionPool.closeConnection(channelKey.getConnectionId()); finalTunneledResponse.future.complete(null); - }) - ); + }))); return finalTunneledResponse; } @@ -118,13 +125,13 @@ public StringTrackableCompletableFuture scheduleClose(ISourceTrafficChanne .getNow(null).channel(); runAfterChannelSetup(channelFutureAndRequestSchedule, finalTunneledResponse, - cffr -> { - cffr.scheduleSequencer.add(channelInteractionNumber, + replaySession -> { + replaySession.scheduleSequencer.add(channelInteractionNumber, () -> successFn.accept(channelFutureAndRequestSchedule), x -> x.run()); - if (cffr.scheduleSequencer.hasPending()) { + if (replaySession.scheduleSequencer.hasPending()) { log.atDebug().setMessage(()->"Sequencer for "+channelKey+ - " = "+cffr.scheduleSequencer).log(); + " = "+replaySession.scheduleSequencer).log(); } }); }) @@ -143,32 +150,16 @@ public StringTrackableCompletableFuture scheduleClose(ISourceTrafficChanne private void scheduleOnConnectionReplaySession(ISourceTrafficChannelKey channelKey, int channelInteractionIdx, ConnectionReplaySession channelFutureAndRequestSchedule, StringTrackableCompletableFuture futureToBeCompletedByTask, - Instant atTime, String activityNameForLogging, Runnable task) { + Instant atTime, ChannelTask task) { var channelInteraction = new IndexedChannelInteraction(channelKey, channelInteractionIdx); - log.atInfo().setMessage(()->channelInteraction + " scheduling " + activityNameForLogging + " at " + atTime).log(); + log.atInfo().setMessage(()->channelInteraction + " scheduling " + task.kind + " at " + atTime).log(); var schedule = channelFutureAndRequestSchedule.schedule; var eventLoop = channelFutureAndRequestSchedule.getInnerChannelFuture().channel().eventLoop(); - futureToBeCompletedByTask.map(f->f.whenComplete((v,t)-> { - var itemStartTimeOfPopped = schedule.removeFirstItem(); - assert atTime.equals(itemStartTimeOfPopped): - "Expected to have popped the item to match the start time for the responseFuture that finished"; - log.atDebug().setMessage(()->channelInteraction.toString() + " responseFuture completed - checking " - + schedule + " for the next item to schedule").log(); - Optional.ofNullable(schedule.peekFirstItem()).ifPresent(kvp-> { - var sf = eventLoop.schedule(kvp.getValue(), getDelayFromNowMs(kvp.getKey()), TimeUnit.MILLISECONDS); - sf.addListener(sfp->{ - if (!sfp.isSuccess()) { - log.atWarn().setCause(sfp.cause()).setMessage(()->"Scheduled future was not successful").log(); - } - }); - }); - }), ()->""); - if (schedule.isEmpty()) { var scheduledFuture = - eventLoop.schedule(task, getDelayFromNowMs(atTime), TimeUnit.MILLISECONDS); + eventLoop.schedule(task.runnable, getDelayFromNowMs(atTime), TimeUnit.MILLISECONDS); scheduledFuture.addListener(f->{ if (!f.isSuccess()) { log.atError().setCause(f.cause()).setMessage(()->"Error scheduling task").log(); @@ -184,6 +175,23 @@ private void scheduleOnConnectionReplaySession(ISourceTrafficChannelKey chan schedule.appendTask(atTime, task); log.atTrace().setMessage(()->channelInteraction + " added a scheduled event at " + atTime + "... " + schedule).log(); + + futureToBeCompletedByTask.map(f->f.whenComplete((v,t)-> { + var itemStartTimeOfPopped = schedule.removeFirstItem(); + assert atTime.equals(itemStartTimeOfPopped): + "Expected to have popped the item to match the start time for the responseFuture that finished"; + log.atDebug().setMessage(()->channelInteraction.toString() + " responseFuture completed - checking " + + schedule + " for the next item to schedule").log(); + Optional.ofNullable(schedule.peekFirstItem()).ifPresent(kvp-> { + var runnable = kvp.getValue().runnable; + var sf = eventLoop.schedule(runnable, getDelayFromNowMs(kvp.getKey()), TimeUnit.MILLISECONDS); + sf.addListener(sfp->{ + if (!sfp.isSuccess()) { + log.atWarn().setCause(sfp.cause()).setMessage(()->"Scheduled future was not successful").log(); + } + }); + }); + }), ()->""); } private void scheduleSendOnConnectionReplaySession(UniqueReplayerRequestKey requestKey, @@ -197,7 +205,8 @@ private void scheduleSendOnConnectionReplaySession(UniqueReplayerRequestKey requ packetReceiverRef), eventLoop, packets.iterator(), start, interval, new AtomicInteger(), responseFuture); scheduleOnConnectionReplaySession(requestKey.trafficStreamKey, requestKey.getSourceRequestIndex(), - channelFutureAndRequestSchedule, responseFuture, start, "send", packetSender); + channelFutureAndRequestSchedule, responseFuture, start, + new ChannelTask(ChannelTaskType.TRANSMIT, packetSender)); } private void runAfterChannelSetup(ConnectionReplaySession channelFutureAndItsFutureRequests, diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/ChannelTask.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/ChannelTask.java new file mode 100644 index 000000000..f8a43945f --- /dev/null +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/ChannelTask.java @@ -0,0 +1,11 @@ +package org.opensearch.migrations.replay.datatypes; + +import lombok.AllArgsConstructor; +import lombok.Getter; + +@AllArgsConstructor +@Getter +public class ChannelTask { + public final ChannelTaskType kind; + public final Runnable runnable; +} diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/ChannelTaskType.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/ChannelTaskType.java new file mode 100644 index 000000000..315a09fd0 --- /dev/null +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/ChannelTaskType.java @@ -0,0 +1,5 @@ +package org.opensearch.migrations.replay.datatypes; + +public enum ChannelTaskType { + TRANSMIT, CLOSE +} diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/ConnectionReplaySession.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/ConnectionReplaySession.java index ae6554c71..642880d58 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/ConnectionReplaySession.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/ConnectionReplaySession.java @@ -46,7 +46,7 @@ public ChannelFuture getInnerChannelFuture() { } public boolean hasWorkRemaining() { - return !schedule.isEmpty() || scheduleSequencer.hasPending(); + return scheduleSequencer.hasPending() || schedule.hasPendingTransmissions(); } public long calculateSizeSlowly() { diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/TimeToResponseFulfillmentFutureMap.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/TimeToResponseFulfillmentFutureMap.java index eb637bc60..7e8f63387 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/TimeToResponseFulfillmentFutureMap.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/TimeToResponseFulfillmentFutureMap.java @@ -8,15 +8,16 @@ import java.util.TreeMap; public class TimeToResponseFulfillmentFutureMap { - TreeMap> timeToRunnableMap = new TreeMap<>(); - public void appendTask(Instant start, Runnable packetSender) { + TreeMap> timeToRunnableMap = new TreeMap<>(); + + public void appendTask(Instant start, ChannelTask task) { assert timeToRunnableMap.keySet().stream().allMatch(t->!t.isAfter(start)); var existing = timeToRunnableMap.computeIfAbsent(start, k->new ArrayDeque<>()); - existing.offer(packetSender); + existing.offer(task); } - public Map.Entry peekFirstItem() { + public Map.Entry peekFirstItem() { var e = timeToRunnableMap.firstEntry(); return e == null ? null : new AbstractMap.SimpleEntry<>(e.getKey(), e.getValue().peek()); } @@ -39,6 +40,17 @@ public boolean isEmpty() { return timeToRunnableMap.isEmpty(); } + public boolean hasPendingTransmissions() { + if (timeToRunnableMap.isEmpty()) { + return false; + } else { + return timeToRunnableMap.values().stream() + .flatMap(d->d.stream()) + .filter(ct->ct.kind==ChannelTaskType.TRANSMIT) + .findAny().isPresent(); + } + } + public long calculateSizeSlowly() { return timeToRunnableMap.values().stream().map(ArrayDeque::size).mapToInt(x->x).sum(); } @@ -60,4 +72,5 @@ private String formatBookends() { .toString(); } } + } diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datatypes/TimeToResponseFulfillmentFutureMapTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datatypes/TimeToResponseFulfillmentFutureMapTest.java index f8d718854..2da823381 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datatypes/TimeToResponseFulfillmentFutureMapTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datatypes/TimeToResponseFulfillmentFutureMapTest.java @@ -13,16 +13,16 @@ class TimeToResponseFulfillmentFutureMapTest { public void testAddsAndPopsAreOrdered() { var timeMap = new TimeToResponseFulfillmentFutureMap(); StringBuilder log = new StringBuilder(); - timeMap.appendTask(Instant.EPOCH, ()->log.append('A')); - timeMap.appendTask(Instant.EPOCH, ()->log.append('B')); - timeMap.appendTask(Instant.EPOCH.plus(Duration.ofMillis(1)), ()->log.append('C')); - timeMap.appendTask(Instant.EPOCH.plus(Duration.ofMillis(1)), ()->log.append('D')); + timeMap.appendTask(Instant.EPOCH, new ChannelTask(ChannelTaskType.TRANSMIT, ()->log.append('A'))); + timeMap.appendTask(Instant.EPOCH, new ChannelTask(ChannelTaskType.TRANSMIT, ()->log.append('B'))); + timeMap.appendTask(Instant.EPOCH.plus(Duration.ofMillis(1)), new ChannelTask(ChannelTaskType.TRANSMIT, ()->log.append('C'))); + timeMap.appendTask(Instant.EPOCH.plus(Duration.ofMillis(1)), new ChannelTask(ChannelTaskType.TRANSMIT, ()->log.append('D'))); while (true) { var t = timeMap.peekFirstItem(); if (t == null) { break; } - t.getValue().run(); + t.getValue().runnable.run(); timeMap.removeFirstItem(); } Assertions.assertEquals("ABCD", log.toString()); From 27799478d676ee3f33c5ed17afecb4a5eeb5b233 Mon Sep 17 00:00:00 2001 From: Greg Schohn Date: Wed, 6 Dec 2023 13:25:26 -0500 Subject: [PATCH 4/6] Checkin a disabled test to verify that when a connection was composed of only requests that were suppressed, nothing (not even a close) will be emitted. This test, if enabled, breaks because we don't differentiate the different paths that can lead to close cases. Signed-off-by: Greg Schohn --- ...ReliableLoggingHttpRequestHandlerTest.java | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpRequestHandlerTest.java b/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpRequestHandlerTest.java index b0e87c6db..292b3fa1b 100644 --- a/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpRequestHandlerTest.java +++ b/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpRequestHandlerTest.java @@ -8,6 +8,8 @@ import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.opensearch.migrations.testutils.TestUtilities; @@ -136,6 +138,33 @@ private static Consumer getSingleByteAtATimeWriter(boolean useP }; } + // This test doesn't work yet, but this is an optimization. Getting connections with only a + // close observation is already a common occurrence. This is nice to have, so it's good to + // keep this warm and ready, but we don't need the feature for correctness. + @Disabled + @Test + @ValueSource(booleans = {false, true}) + public void testThatSuppressedCaptureWorks() throws Exception { + var streamMgr = new TestStreamManager(); + var offloader = new StreamChannelConnectionCaptureSerializer("Test", "connection", streamMgr); + + var headerCapturePredicate = new HeaderValueFilteringCapturePredicate(Map.of("user-Agent", "uploader")); + EmbeddedChannel channel = new EmbeddedChannel( + new ConditionallyReliableLoggingHttpRequestHandler(offloader, headerCapturePredicate, x->true)); + getWriter(false, true, SimpleRequests.HEALTH_CHECK.getBytes(StandardCharsets.UTF_8)).accept(channel); + channel.close(); + var requestBytes = SimpleRequests.HEALTH_CHECK.getBytes(StandardCharsets.UTF_8); + + Assertions.assertEquals(0, streamMgr.flushCount.get()); + // we wrote the correct data to the downstream handler/channel + var outputData = new SequenceInputStream(Collections.enumeration(channel.inboundMessages().stream() + .map(m->new ByteArrayInputStream(consumeIntoArray((ByteBuf)m))) + .collect(Collectors.toList()))) + .readAllBytes(); + log.info("outputdata = " + new String(outputData, StandardCharsets.UTF_8)); + Assertions.assertArrayEquals(requestBytes, outputData); + } + @ParameterizedTest @ValueSource(booleans = {false, true}) public void testThatHealthCheckCaptureCanBeSuppressed(boolean singleBytes) throws Exception { From b2d34ba8b2c942a461eb745432a0430818ba0969 Mon Sep 17 00:00:00 2001 From: Greg Schohn Date: Thu, 7 Dec 2023 10:07:49 -0500 Subject: [PATCH 5/6] Add code to handle Dropped signals. Since the following observations after a Drop will be more reads from the next request, rather than writes, the state model in reconstruction needed some changes. The tests also needed to be improved to make sequences that were well-formed. The invalid ones that HAD been generated and were flawed in the same way that the accumulation logic was. Both effectively just ignored the DropRequest observation. Now neither does & changes have been made to each accordingly. Since there's a new observation, I had to update the list of seeds to get complete coverage. Signed-off-by: Greg Schohn --- .../migrations/replay/Accumulation.java | 5 +++++ ...pturedTrafficToHttpTransactionAccumulator.java | 15 +++++++++++++-- .../replay/RequestResponsePacketPair.java | 7 +++---- ...edTrafficToHttpTransactionAccumulatorTest.java | 2 +- ...edTrafficToHttpTransactionAccumulatorTest.java | 10 +++++++++- .../migrations/replay/TrafficStreamGenerator.java | 13 ++++++++----- 6 files changed, 39 insertions(+), 13 deletions(-) diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/Accumulation.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/Accumulation.java index b3f65daf2..a65aae727 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/Accumulation.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/Accumulation.java @@ -106,4 +106,9 @@ public void resetForNextRequest() { this.state = State.ACCUMULATING_READS; this.rrPair = null; } + + public void resetToIgnoreAndForgetCurrentRequest() { + this.state = State.WAITING_FOR_NEXT_READ_CHUNK; + this.rrPair = null; + } } diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/CapturedTrafficToHttpTransactionAccumulator.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/CapturedTrafficToHttpTransactionAccumulator.java index dc5aca221..5eafc1b92 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/CapturedTrafficToHttpTransactionAccumulator.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/CapturedTrafficToHttpTransactionAccumulator.java @@ -191,12 +191,15 @@ public CONNECTION_STATUS addObservationToAccumulation(@NonNull Accumulation accu } private static Optional handleObservationForSkipState(Accumulation accum, TrafficObservation observation) { + assert !observation.hasClose() : "close will be handled earlier in handleCloseObservationThatAffectEveryState"; if (accum.state == Accumulation.State.IGNORING_LAST_REQUEST) { - if (observation.hasWrite() || observation.hasWriteSegment() || observation.hasEndOfMessageIndicator()) { + if (observation.hasWrite() || observation.hasWriteSegment() || + observation.hasEndOfMessageIndicator() || + observation.hasRequestDropped()) { accum.state = Accumulation.State.WAITING_FOR_NEXT_READ_CHUNK; } // ignore everything until we hit an EOM - return Optional.of(observation.hasClose() ? CONNECTION_STATUS.CLOSED : CONNECTION_STATUS.ALIVE); + return Optional.of(CONNECTION_STATUS.ALIVE); } else if (accum.state == Accumulation.State.WAITING_FOR_NEXT_READ_CHUNK) { // already processed EOMs above. Be on the lookout to ignore writes if (!(observation.hasRead() || observation.hasReadSegment())) { @@ -270,6 +273,12 @@ private Optional handleObservationForReadState(@NonNull Accum var rrPair = accum.getRrPair(); assert rrPair.requestData.hasInProgressSegment(); rrPair.requestData.finalizeRequestSegments(timestamp); + } else if (observation.hasRequestDropped()){ + requestCounter.decrementAndGet(); + accum.getRrPair().getTrafficStreamsHeld().forEach(listener::onTrafficStreamIgnored); + accum.resetToIgnoreAndForgetCurrentRequest(); + } else { + return Optional.empty(); } return Optional.of(CONNECTION_STATUS.ALIVE); } @@ -303,6 +312,8 @@ private Optional handleObservationForWriteState(Accumulation } else if (observation.hasRead() || observation.hasReadSegment()) { rotateAccumulationOnReadIfNecessary(connectionId, accum); return handleObservationForReadState(accum, observation, trafficStreamKey, timestamp); + } else { + return Optional.empty(); } return Optional.of(CONNECTION_STATUS.ALIVE); diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/RequestResponsePacketPair.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/RequestResponsePacketPair.java index 80b424748..cd1dabcc9 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/RequestResponsePacketPair.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/RequestResponsePacketPair.java @@ -10,7 +10,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Optional; @Slf4j public class RequestResponsePacketPair { @@ -23,17 +22,17 @@ public enum ReconstructionStatus { HttpMessageAndTimestamp requestData; HttpMessageAndTimestamp responseData; - final ITrafficStreamKey trafficStreamKey; + final ITrafficStreamKey firstTrafficStreamKeyForRequest; List trafficStreamKeysBeingHeld; ReconstructionStatus completionStatus; public RequestResponsePacketPair(@NonNull ITrafficStreamKey startingAtTrafficStreamKey) { this.trafficStreamKeysBeingHeld = new ArrayList<>(); - trafficStreamKey = startingAtTrafficStreamKey; + firstTrafficStreamKeyForRequest = startingAtTrafficStreamKey; } @NonNull ITrafficStreamKey getBeginningTrafficStreamKey() { - return trafficStreamKey; + return firstTrafficStreamKeyForRequest; } public void addRequestData(Instant packetTimeStamp, byte[] data) { diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ExhaustiveCapturedTrafficToHttpTransactionAccumulatorTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ExhaustiveCapturedTrafficToHttpTransactionAccumulatorTest.java index ba9d26c19..8294aec17 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ExhaustiveCapturedTrafficToHttpTransactionAccumulatorTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ExhaustiveCapturedTrafficToHttpTransactionAccumulatorTest.java @@ -60,7 +60,7 @@ static Arguments[] generateTestCombinations() { return generateAllTestsAndConfirmComplete( // IntStream.generate(()->rand.nextInt()) TrafficStreamGenerator.RANDOM_GENERATOR_SEEDS_FOR_SUFFICIENT_TRAFFIC_VARIANCE -// List.of(1705850753) +// List.of(2110766901) .stream().mapToInt(i->i) ); } diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/SimpleCapturedTrafficToHttpTransactionAccumulatorTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/SimpleCapturedTrafficToHttpTransactionAccumulatorTest.java index 669d6c5ba..bb1754f93 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/SimpleCapturedTrafficToHttpTransactionAccumulatorTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/SimpleCapturedTrafficToHttpTransactionAccumulatorTest.java @@ -26,6 +26,7 @@ import java.util.Arrays; import java.util.List; import java.util.SortedSet; +import java.util.StringJoiner; import java.util.TreeSet; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -80,6 +81,11 @@ public static ObservationDirective write(int i) { public static ObservationDirective flush() { return new ObservationDirective(OffloaderCommandType.Flush, 0); } + + @Override + public String toString() { + return "(" + offloaderCommandType + ":" + size + ")"; + } } public static InMemoryConnectionCaptureFactory buildSerializerFactory(int bufferSize, Runnable onClosedCallback) { @@ -231,7 +237,9 @@ public void onConnectionClose(ISourceTrafficChannelKey key, int channelInteracti List trafficStreamKeysBeingHeld) { } - @Override public void onTrafficStreamIgnored(@NonNull ITrafficStreamKey tsk) {} + @Override public void onTrafficStreamIgnored(@NonNull ITrafficStreamKey tsk) { + tsIndicesReceived.add(tsk.getTrafficStreamIndex()); + } }); var tsList = trafficStreams.collect(Collectors.toList()); trafficStreams = tsList.stream(); diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/TrafficStreamGenerator.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/TrafficStreamGenerator.java index 67b49ebbf..cc13dfda5 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/TrafficStreamGenerator.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/TrafficStreamGenerator.java @@ -32,11 +32,11 @@ public class TrafficStreamGenerator { public static final int MAX_READS_IN_REQUEST = 5; public static final int MAX_WRITES_IN_RESPONSE = 5; public static final List RANDOM_GENERATOR_SEEDS_FOR_SUFFICIENT_TRAFFIC_VARIANCE = List.of( - -1155869325, 892128508, 155629808, 1429008869, -138487339, 26273138, 685382526, - -226796111, -270230103, 1705850753, -1978864692, -836540342, 1181244667,-193570837, - -1617640095, -1359243304, -1973979577, -67333094, -2129721489, -2114584541, -1121163101, - 866345822, 1302951949, 30280569, -1199907813, 34574822, 2109003795, -1349584475, -2050877083, - 160359681, -1345969040, -20026097, 793184536, 834861033 + -1155869325, 892128508, 155629808, 1429008869, 26273138, 685382526, 1705850753, -1978864692, + -836540342, -193570837, -1617640095, -1359243304, -1973979577, -67333094, -2129721489, + 2110766901, -1121163101, 866345822, -297497515, -736375570, 30280569, -1199907813, + 1887084032, 519330814, -2050877083, 174127839, 1712524135, -861378278, 793184536, 174500816, + 237039773, 944491332 ); public enum ObservationType { @@ -191,6 +191,9 @@ private static void fillCommandsAndSizes(Random r, double cancelRequestLikelihoo ()-> SimpleCapturedTrafficToHttpTransactionAccumulatorTest.ObservationDirective.read(r.nextInt(bufferBound))); if (r.nextDouble() <= cancelRequestLikelihood) { commands.add(SimpleCapturedTrafficToHttpTransactionAccumulatorTest.ObservationDirective.cancelOffload()); + ++i; // compensate to get the right number of requests in the loop + sizes.remove(sizes.size()-1); // This won't show up as a request, so don't propagate it + continue; } if (r.nextDouble() <= flushLikelihood) { commands.add(SimpleCapturedTrafficToHttpTransactionAccumulatorTest.ObservationDirective.flush()); From 927db7351ec5fbebde1b27c6d20c627715fdbedc Mon Sep 17 00:00:00 2001 From: Greg Schohn Date: Fri, 8 Dec 2023 00:47:09 -0500 Subject: [PATCH 6/6] Fix a bug where the sourceRequestIndex was being incorrectly inferred when the first observed trafficStream had a carry-over request that was ALSO going to be ignored (in a subsequent TrafficStream). There are some other changes included here because it wasn't clear WHAT the cause of the duplicates was. Now the test prints out the first characters (which are now always printable ascii) from each read/write observation and the callback compares that the result wasn't the same as the previous one (which was the issue before this patch). Signed-off-by: Greg Schohn --- .../protos/TrafficStreamUtils.java | 25 ++++++++++++++- .../migrations/replay/Accumulation.java | 11 ++++++- ...edTrafficToHttpTransactionAccumulator.java | 31 ++++++++++++------- .../KafkaRestartingTrafficReplayerTest.java | 1 + ...afficToHttpTransactionAccumulatorTest.java | 10 +++++- .../replay/TrafficReplayerRunner.java | 4 +++ .../replay/TrafficStreamGenerator.java | 6 ++++ 7 files changed, 73 insertions(+), 15 deletions(-) diff --git a/TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/trafficcapture/protos/TrafficStreamUtils.java b/TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/trafficcapture/protos/TrafficStreamUtils.java index f247de68d..84fe514fa 100644 --- a/TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/trafficcapture/protos/TrafficStreamUtils.java +++ b/TrafficCapture/coreUtilities/src/main/java/org/opensearch/migrations/trafficcapture/protos/TrafficStreamUtils.java @@ -1,6 +1,8 @@ package org.opensearch.migrations.trafficcapture.protos; import com.google.protobuf.Timestamp; + +import java.nio.charset.StandardCharsets; import java.time.Instant; import java.util.Optional; import java.util.stream.Collectors; @@ -22,11 +24,32 @@ public static Optional getFirstTimestamp(TrafficStream ts) { public static String summarizeTrafficStream(TrafficStream ts) { var listSummaryStr = ts.getSubStreamList().stream() - .map(tso->instantFromProtoTimestamp(tso.getTs()) + ": " + captureCaseToString(tso.getCaptureCase())) + .map(tso->instantFromProtoTimestamp(tso.getTs()) + ": " + captureCaseToString(tso.getCaptureCase()) + + getOptionalContext(tso)) .collect(Collectors.joining(", ")); return ts.getConnectionId() + " (#" + getTrafficStreamIndex(ts) + ")[" + listSummaryStr + "]"; } + private static Object getOptionalContext(TrafficObservation tso) { + return Optional.ofNullable(getByteArrayForDataOf(tso)) + .map(b->" " + new String(b, 0, Math.min(3, b.length), StandardCharsets.UTF_8)) + .orElse(""); + } + + private static byte[] getByteArrayForDataOf(TrafficObservation tso) { + if (tso.hasRead()) { + return tso.getRead().getData().toByteArray(); + } else if (tso.hasReadSegment()) { + return tso.getReadSegment().getData().toByteArray(); + } else if (tso.hasWrite()) { + return tso.getWrite().getData().toByteArray(); + } else if (tso.hasWriteSegment()) { + return tso.getWriteSegment().getData().toByteArray(); + } else { + return null; + } + } + public static int getTrafficStreamIndex(TrafficStream ts) { return ts.hasNumber() ? ts.getNumber() : ts.getNumberOfThisLastChunk(); } diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/Accumulation.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/Accumulation.java index a65aae727..f43440c08 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/Accumulation.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/Accumulation.java @@ -4,6 +4,7 @@ import org.opensearch.migrations.replay.datatypes.ISourceTrafficChannelKey; import org.opensearch.migrations.replay.datatypes.ITrafficStreamKey; import org.opensearch.migrations.replay.datatypes.UniqueReplayerRequestKey; +import org.opensearch.migrations.trafficcapture.protos.TrafficStream; import java.time.Instant; import java.util.concurrent.atomic.AtomicInteger; @@ -26,7 +27,12 @@ enum State { AtomicLong newestPacketTimestampInMillis; State state; AtomicInteger numberOfResets; - final int startingSourceRequestIndex; + int startingSourceRequestIndex; + + public Accumulation(ITrafficStreamKey key, TrafficStream ts) { + this(key, ts.getPriorRequestsReceived()+(ts.hasLastObservationWasUnterminatedRead()?1:0), + ts.getLastObservationWasUnterminatedRead()); + } public Accumulation(@NonNull ITrafficStreamKey trafficChannelKey, int startingSourceRequestIndex) { this(trafficChannelKey, startingSourceRequestIndex, false); @@ -108,6 +114,9 @@ public void resetForNextRequest() { } public void resetToIgnoreAndForgetCurrentRequest() { + if (state == State.IGNORING_LAST_REQUEST) { + --startingSourceRequestIndex; + } this.state = State.WAITING_FOR_NEXT_READ_CHUNK; this.rrPair = null; } diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/CapturedTrafficToHttpTransactionAccumulator.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/CapturedTrafficToHttpTransactionAccumulator.java index 5eafc1b92..4f60258e3 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/CapturedTrafficToHttpTransactionAccumulator.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/CapturedTrafficToHttpTransactionAccumulator.java @@ -87,7 +87,7 @@ public String appendageToDescribeHowToSetMinimumGuaranteedLifetime() { public void onExpireAccumulation(String partitionId, Accumulation accumulation) { connectionsExpiredCounter.incrementAndGet(); log.atTrace().setMessage(()->"firing accumulation for accum=[" - + accumulation.getRequestKey() + "]=" + accumulation) + + accumulation.getRrPair().getBeginningTrafficStreamKey() + "]=" + accumulation) .log(); fireAccumulationsCallbacksAndClose(accumulation, RequestResponsePacketPair.ReconstructionStatus.EXPIRED_PREMATURELY); @@ -163,10 +163,7 @@ private Accumulation createInitialAccumulation(ITrafficStreamWithKey streamWithK " are encountered. Full stream object=" + stream).log(); } - var startingSourceRequestOffset = - stream.getPriorRequestsReceived()+(stream.hasLastObservationWasUnterminatedRead()?1:0); - return new Accumulation(streamWithKey.getKey(), - startingSourceRequestOffset, stream.getLastObservationWasUnterminatedRead()); + return new Accumulation(streamWithKey.getKey(), stream); } private enum CONNECTION_STATUS { @@ -176,7 +173,7 @@ private enum CONNECTION_STATUS { public CONNECTION_STATUS addObservationToAccumulation(@NonNull Accumulation accum, @NonNull ITrafficStreamKey trafficStreamKey, TrafficObservation observation) { - log.atTrace().setMessage(()->"Adding observation: "+observation).log(); + log.atTrace().setMessage(()->"Adding observation: "+observation+" with state="+accum.state).log(); var timestamp = TrafficStreamUtils.instantFromProtoTimestamp(observation.getTs()); liveStreams.expireOldEntries(trafficStreamKey, accum, timestamp); @@ -190,13 +187,14 @@ public CONNECTION_STATUS addObservationToAccumulation(@NonNull Accumulation accu }); } - private static Optional handleObservationForSkipState(Accumulation accum, TrafficObservation observation) { + private Optional handleObservationForSkipState(Accumulation accum, TrafficObservation observation) { assert !observation.hasClose() : "close will be handled earlier in handleCloseObservationThatAffectEveryState"; if (accum.state == Accumulation.State.IGNORING_LAST_REQUEST) { if (observation.hasWrite() || observation.hasWriteSegment() || - observation.hasEndOfMessageIndicator() || - observation.hasRequestDropped()) { + observation.hasEndOfMessageIndicator()) { accum.state = Accumulation.State.WAITING_FOR_NEXT_READ_CHUNK; + } else if (observation.hasRequestDropped()) { + handleDroppedRequestForAccumulation(accum); } // ignore everything until we hit an EOM return Optional.of(CONNECTION_STATUS.ALIVE); @@ -275,8 +273,7 @@ private Optional handleObservationForReadState(@NonNull Accum rrPair.requestData.finalizeRequestSegments(timestamp); } else if (observation.hasRequestDropped()){ requestCounter.decrementAndGet(); - accum.getRrPair().getTrafficStreamsHeld().forEach(listener::onTrafficStreamIgnored); - accum.resetToIgnoreAndForgetCurrentRequest(); + handleDroppedRequestForAccumulation(accum); } else { return Optional.empty(); } @@ -319,6 +316,15 @@ private Optional handleObservationForWriteState(Accumulation } + private void handleDroppedRequestForAccumulation(Accumulation accum) { + if (accum.hasRrPair()) { + accum.getRrPair().getTrafficStreamsHeld().forEach(listener::onTrafficStreamIgnored); + } + log.atTrace().setMessage(()->"resetting to forget "+ accum.trafficChannelKey).log(); + accum.resetToIgnoreAndForgetCurrentRequest(); + log.atTrace().setMessage(()->"done resetting to forget and accum="+ accum).log(); + } + // This function manages the transition case when an observation comes in that would terminate // any previous HTTP transaction for the connection. It returns true if there WAS a previous // transaction that has been reset and false otherwise @@ -326,7 +332,7 @@ private boolean rotateAccumulationIfNecessary(String connectionId, Accumulation // If this was brand new, we don't need to care about triggering the callback. // We only need to worry about this if we have yet to send the RESPONSE. if (accum.state == Accumulation.State.ACCUMULATING_WRITES) { - log.atDebug().setMessage(()->"Resetting accum[" + connectionId + "]=" + accum).log(); + log.atDebug().setMessage(()->"handling EOM for accum[" + connectionId + "]=" + accum).log(); handleEndOfResponse(accum, RequestResponsePacketPair.ReconstructionStatus.COMPLETE); return true; } @@ -367,6 +373,7 @@ private void handleEndOfResponse(Accumulation accumulation, var rrPair = accumulation.getRrPair(); rrPair.completionStatus = status; listener.onFullDataReceived(accumulation.getRequestKey(), rrPair); + log.atTrace().setMessage("resetting for end of response").log(); accumulation.resetForNextRequest(); } diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/KafkaRestartingTrafficReplayerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/KafkaRestartingTrafficReplayerTest.java index 8d55a6ffc..264c2bda3 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/KafkaRestartingTrafficReplayerTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/KafkaRestartingTrafficReplayerTest.java @@ -11,6 +11,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; +import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; import org.opensearch.migrations.replay.kafka.KafkaTrafficCaptureSource; import org.opensearch.migrations.replay.traffic.source.ISimpleTrafficCaptureSource; diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/SimpleCapturedTrafficToHttpTransactionAccumulatorTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/SimpleCapturedTrafficToHttpTransactionAccumulatorTest.java index bb1754f93..f9833785c 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/SimpleCapturedTrafficToHttpTransactionAccumulatorTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/SimpleCapturedTrafficToHttpTransactionAccumulatorTest.java @@ -92,11 +92,19 @@ public static InMemoryConnectionCaptureFactory buildSerializerFactory(int buffer return new InMemoryConnectionCaptureFactory("TEST_NODE_ID", bufferSize, onClosedCallback); } + private static byte nextPrintable(int i) { + final char firstChar = ' '; + final byte lastChar = '~'; + var r = (byte) (i%(lastChar-firstChar)); + return (byte) ((r < 0) ? (lastChar + r) : (byte) (r + firstChar)); + } + static ByteBuf makeSequentialByteBuf(int offset, int size) { var bb = Unpooled.buffer(size); + final var b = nextPrintable(offset); for (int i=0; it.sourcePair.equals(prevT.sourcePair)).orElse(false)); var totalUnique = null != completelyHandledItems.put(keyString, t) ? totalUniqueEverReceived.get() : totalUniqueEverReceived.incrementAndGet(); diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/TrafficStreamGenerator.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/TrafficStreamGenerator.java index cc13dfda5..abd0f17e2 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/TrafficStreamGenerator.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/TrafficStreamGenerator.java @@ -6,6 +6,7 @@ import org.opensearch.migrations.testutils.StreamInterleaver; import org.opensearch.migrations.trafficcapture.protos.TrafficObservation; import org.opensearch.migrations.trafficcapture.protos.TrafficStream; +import org.opensearch.migrations.trafficcapture.protos.TrafficStreamUtils; import java.util.ArrayList; import java.util.Arrays; @@ -270,6 +271,11 @@ public static class StreamAndExpectedSizes { generateRandomTrafficStreamsAndSizes(IntStream.range(0,count)) : generateAllIndicativeRandomTrafficStreamsAndSizes(); var testCaseArr = generatedCases.toArray(RandomTrafficStreamAndTransactionSizes[]::new); + log.atInfo().setMessage(()-> + Arrays.stream(testCaseArr) + .flatMap(tc->Arrays.stream(tc.trafficStreams).map(TrafficStreamUtils::summarizeTrafficStream)) + .collect(Collectors.joining("\n"))) + .log(); var aggregatedStreams = randomize ? StreamInterleaver.randomlyInterleaveStreams(new Random(count), Arrays.stream(testCaseArr).map(c->Arrays.stream(c.trafficStreams))) :