From 04735696eabbb50db0e5804974443f313ade978a Mon Sep 17 00:00:00 2001 From: Andre Kurait Date: Thu, 28 Mar 2024 18:57:08 -0500 Subject: [PATCH 1/3] Support Limitless CodedOutputStreams Signed-off-by: Andre Kurait --- .../kafkaoffloader/KafkaCaptureFactory.java | 5 ++ ...CodedOutputStreamAndByteBufferWrapper.java | 7 +- .../CodedOutputStreamHolder.java | 23 ++++++ ...eamChannelConnectionCaptureSerializer.java | 76 +++++++++--------- ...hannelConnectionCaptureSerializerTest.java | 49 +++++++++++- .../proxyserver/CaptureProxy.java | 16 +++- .../CaptureProxyConfigurationTest.java | 78 +++++++++++++++++++ .../testcontainers/CaptureProxyContainer.java | 29 +++++-- 8 files changed, 235 insertions(+), 48 deletions(-) create mode 100644 TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/CaptureProxyConfigurationTest.java diff --git a/TrafficCapture/captureKafkaOffloader/src/main/java/org/opensearch/migrations/trafficcapture/kafkaoffloader/KafkaCaptureFactory.java b/TrafficCapture/captureKafkaOffloader/src/main/java/org/opensearch/migrations/trafficcapture/kafkaoffloader/KafkaCaptureFactory.java index 25c1c6ebd..f78ac146d 100644 --- a/TrafficCapture/captureKafkaOffloader/src/main/java/org/opensearch/migrations/trafficcapture/kafkaoffloader/KafkaCaptureFactory.java +++ b/TrafficCapture/captureKafkaOffloader/src/main/java/org/opensearch/migrations/trafficcapture/kafkaoffloader/KafkaCaptureFactory.java @@ -60,6 +60,11 @@ public KafkaCaptureFactory(IRootKafkaOffloaderContext rootScope, String nodeId, static class CodedOutputStreamWrapper implements CodedOutputStreamHolder { private final CodedOutputStream codedOutputStream; private final ByteBuffer byteBuffer; + @Override + public int getOutputStreamBytesLimit() { + return byteBuffer.limit(); + } + @Override public @NonNull CodedOutputStream getOutputStream() { return codedOutputStream; diff --git a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/CodedOutputStreamAndByteBufferWrapper.java b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/CodedOutputStreamAndByteBufferWrapper.java index 7cbd0bf52..7c064cf79 100644 --- a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/CodedOutputStreamAndByteBufferWrapper.java +++ b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/CodedOutputStreamAndByteBufferWrapper.java @@ -6,16 +6,19 @@ import java.nio.ByteBuffer; +@Getter public class CodedOutputStreamAndByteBufferWrapper implements CodedOutputStreamHolder { @NonNull - @Getter private final CodedOutputStream outputStream; @NonNull - @Getter private final ByteBuffer byteBuffer; public CodedOutputStreamAndByteBufferWrapper(int bufferSize) { this.byteBuffer = ByteBuffer.allocate(bufferSize); outputStream = CodedOutputStream.newInstance(byteBuffer); } + + public int getOutputStreamBytesLimit() { + return byteBuffer.limit(); + } } diff --git a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/CodedOutputStreamHolder.java b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/CodedOutputStreamHolder.java index 38de22104..362b14ce6 100644 --- a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/CodedOutputStreamHolder.java +++ b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/CodedOutputStreamHolder.java @@ -4,5 +4,28 @@ import lombok.NonNull; public interface CodedOutputStreamHolder { + + /** + * Returns the maximum number of bytes that can be written to the output stream before exceeding + * its limit, or -1 if the stream has no defined limit. + * + * @return the byte limit of the output stream, or -1 if no limit exists. + */ + int getOutputStreamBytesLimit(); + + /** + * Calculates the remaining space in the output stream based on the limit set by + * {@link #getOutputStreamBytesLimit()}. If the limit is defined, this method returns + * the difference between that limit and the number of bytes already written. If no + * limit is defined, returns -1, indicating unbounded space. + * + * @return the number of remaining bytes that can be written before reaching the limit, + * or -1 if the stream is unbounded. + */ + default int getOutputStreamSpaceLeft() { + var limit = getOutputStreamBytesLimit(); + return (limit != -1) ? limit - getOutputStream().getTotalBytesWritten() : -1; + } + @NonNull CodedOutputStream getOutputStream(); } diff --git a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java index ee06b4443..8b56e4646 100644 --- a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java +++ b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java @@ -4,6 +4,7 @@ import com.google.protobuf.Descriptors; import com.google.protobuf.Timestamp; import io.netty.buffer.ByteBuf; +import java.util.function.Supplier; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import org.opensearch.migrations.trafficcapture.protos.CloseObservation; @@ -91,7 +92,7 @@ private static int getWireTypeForFieldIndex(Descriptors.Descriptor d, int fieldN return d.findFieldByNumber(fieldNumber).getLiteType().getWireType(); } - private CodedOutputStream getOrCreateCodedOutputStream() throws IOException { + private CodedOutputStreamHolder getOrCreateCodedOutputStreamHolder() throws IOException { if (streamHasBeenClosed) { // In an abundance of caution, flip the state back to basically act like a whole new // stream is being setup @@ -105,7 +106,7 @@ private CodedOutputStream getOrCreateCodedOutputStream() throws IOException { streamHasBeenClosed = false; } if (currentCodedOutputStreamHolderOrNull != null) { - return currentCodedOutputStreamHolderOrNull.getOutputStream(); + return currentCodedOutputStreamHolderOrNull; } else { currentCodedOutputStreamHolderOrNull = streamManager.createStream(); var currentCodedOutputStream = currentCodedOutputStreamHolderOrNull.getOutputStream(); @@ -122,17 +123,26 @@ private CodedOutputStream getOrCreateCodedOutputStream() throws IOException { currentCodedOutputStream.writeBool(TrafficStream.LASTOBSERVATIONWASUNTERMINATEDREAD_FIELD_NUMBER, readObservationsAreWaitingForEom); } - return currentCodedOutputStream; + return currentCodedOutputStreamHolderOrNull; } } + public CompletableFuture flushIfNeeded(Supplier requiredSize) throws IOException { + var spaceLeft = getOrCreateCodedOutputStreamHolder().getOutputStreamSpaceLeft(); + if (spaceLeft != -1 && spaceLeft < requiredSize.get()) { + return flushCommitAndResetStream(false); + } + return CompletableFuture.completedFuture(null); + } + + private void writeTrafficStreamTag(int fieldNumber) throws IOException { - getOrCreateCodedOutputStream().writeTag(fieldNumber, + getOrCreateCodedOutputStreamHolder().getOutputStream().writeTag(fieldNumber, getWireTypeForFieldIndex(TrafficStream.getDescriptor(), fieldNumber)); } private void writeObservationTag(int fieldNumber) throws IOException { - getOrCreateCodedOutputStream().writeTag(fieldNumber, + getOrCreateCodedOutputStreamHolder().getOutputStream().writeTag(fieldNumber, getWireTypeForFieldIndex(TrafficObservation.getDescriptor(), fieldNumber)); } @@ -147,43 +157,39 @@ private void beginSubstreamObservation(Instant timestamp, int captureTagFieldNum final var captureTagNoLengthSize = CodedOutputStream.computeTagSize(captureTagFieldNumber); final var observationContentSize = tsTagSize + tsContentSize + captureTagNoLengthSize + captureTagLengthAndContentSize; // Ensure space is available before starting an observation - if (getOrCreateCodedOutputStream().spaceLeft() < - CodedOutputStreamSizeUtil.bytesNeededForObservationAndClosingIndex(observationContentSize, numFlushesSoFar + 1)) - { - flushCommitAndResetStream(false); - } + flushIfNeeded(() -> CodedOutputStreamSizeUtil.bytesNeededForObservationAndClosingIndex(observationContentSize, numFlushesSoFar + 1)); // e.g. 2 { writeTrafficStreamTag(TrafficStream.SUBSTREAM_FIELD_NUMBER); // Write observation content length - getOrCreateCodedOutputStream().writeUInt32NoTag(observationContentSize); + getOrCreateCodedOutputStreamHolder().getOutputStream().writeUInt32NoTag(observationContentSize); // e.g. 1 { 1: 1234 2: 1234 } writeTimestampForNowToCurrentStream(timestamp); } private void writeTimestampForNowToCurrentStream(Instant timestamp) throws IOException { writeObservationTag(TrafficObservation.TS_FIELD_NUMBER); - getOrCreateCodedOutputStream().writeUInt32NoTag(CodedOutputStreamSizeUtil.getSizeOfTimestamp(timestamp)); + getOrCreateCodedOutputStreamHolder().getOutputStream().writeUInt32NoTag(CodedOutputStreamSizeUtil.getSizeOfTimestamp(timestamp)); - getOrCreateCodedOutputStream().writeInt64(Timestamp.SECONDS_FIELD_NUMBER, timestamp.getEpochSecond()); + getOrCreateCodedOutputStreamHolder().getOutputStream().writeInt64(Timestamp.SECONDS_FIELD_NUMBER, timestamp.getEpochSecond()); if (timestamp.getNano() != 0) { - getOrCreateCodedOutputStream().writeInt32(Timestamp.NANOS_FIELD_NUMBER, timestamp.getNano()); + getOrCreateCodedOutputStreamHolder().getOutputStream().writeInt32(Timestamp.NANOS_FIELD_NUMBER, timestamp.getNano()); } } private void writeByteBufferToCurrentStream(int fieldNum, ByteBuffer byteBuffer) throws IOException { if (byteBuffer.remaining() > 0) { - getOrCreateCodedOutputStream().writeByteBuffer(fieldNum, byteBuffer); + getOrCreateCodedOutputStreamHolder().getOutputStream().writeByteBuffer(fieldNum, byteBuffer); } else { - getOrCreateCodedOutputStream().writeUInt32NoTag(0); + getOrCreateCodedOutputStreamHolder().getOutputStream().writeUInt32NoTag(0); } } private void writeByteStringToCurrentStream(int fieldNum, String str) throws IOException { if (str.length() > 0) { - getOrCreateCodedOutputStream().writeString(fieldNum, str); + getOrCreateCodedOutputStreamHolder().getOutputStream().writeString(fieldNum, str); } else { - getOrCreateCodedOutputStream().writeUInt32NoTag(0); + getOrCreateCodedOutputStreamHolder().getOutputStream().writeUInt32NoTag(0); } } @@ -193,7 +199,7 @@ public CompletableFuture flushCommitAndResetStream(boolean isFinal) throws IO return CompletableFuture.completedFuture(null); } try { - CodedOutputStream currentStream = getOrCreateCodedOutputStream(); + CodedOutputStream currentStream = getOrCreateCodedOutputStreamHolder().getOutputStream(); var fieldNum = isFinal ? TrafficStream.NUMBEROFTHISLASTCHUNK_FIELD_NUMBER : TrafficStream.NUMBER_FIELD_NUMBER; // e.g. 3: 1 currentStream.writeInt32(fieldNum, ++numFlushesSoFar); @@ -213,7 +219,7 @@ public CompletableFuture flushCommitAndResetStream(boolean isFinal) throws IO @Override public void cancelCaptureForCurrentRequest(Instant timestamp) throws IOException { beginSubstreamObservation(timestamp, TrafficObservation.REQUESTDROPPED_FIELD_NUMBER, 1); - getOrCreateCodedOutputStream().writeMessage(TrafficObservation.REQUESTDROPPED_FIELD_NUMBER, + getOrCreateCodedOutputStreamHolder().getOutputStream().writeMessage(TrafficObservation.REQUESTDROPPED_FIELD_NUMBER, RequestIntentionallyDropped.getDefaultInstance()); this.readObservationsAreWaitingForEom = false; this.firstLineByteLength = -1; @@ -238,7 +244,7 @@ public void addDisconnectEvent(Instant timestamp) throws IOException { @Override public void addCloseEvent(Instant timestamp) throws IOException { beginSubstreamObservation(timestamp, TrafficObservation.CLOSE_FIELD_NUMBER, 1); - getOrCreateCodedOutputStream().writeMessage(TrafficObservation.CLOSE_FIELD_NUMBER, + getOrCreateCodedOutputStreamHolder().getOutputStream().writeMessage(TrafficObservation.CLOSE_FIELD_NUMBER, CloseObservation.getDefaultInstance()); } @@ -259,7 +265,7 @@ private void addStringMessage(int captureFieldNumber, int dataFieldNumber, // e.g. 4 { writeObservationTag(captureFieldNumber); if (dataSize > 0) { - getOrCreateCodedOutputStream().writeInt32NoTag(dataSize); + getOrCreateCodedOutputStreamHolder().getOutputStream().writeInt32NoTag(dataSize); } writeByteStringToCurrentStream(dataFieldNumber, str); } @@ -286,20 +292,20 @@ private void addDataMessage(int captureFieldNumber, int dataFieldNumber, Instant int trafficStreamOverhead = messageAndOverheadBytesLeft - byteBuffer.capacity(); // Ensure that space for at least one data byte and overhead exists, otherwise a flush is necessary. - if (trafficStreamOverhead + 1 >= getOrCreateCodedOutputStream().spaceLeft()) { - flushCommitAndResetStream(false); - } + flushIfNeeded(() -> trafficStreamOverhead); // If our message is empty or can fit in the current CodedOutputStream no chunking is needed, and we can continue - if (byteBuffer.limit() == 0 || messageAndOverheadBytesLeft <= getOrCreateCodedOutputStream().spaceLeft()) { - int minExpectedSpaceAfterObservation = getOrCreateCodedOutputStream().spaceLeft() - messageAndOverheadBytesLeft; + var spaceLeft = getOrCreateCodedOutputStreamHolder().getOutputStreamSpaceLeft(); + if (byteBuffer.limit() == 0 || spaceLeft == -1 || messageAndOverheadBytesLeft <= spaceLeft) { + int minExpectedSpaceAfterObservation = spaceLeft - messageAndOverheadBytesLeft; addSubstreamMessage(captureFieldNumber, dataFieldNumber, timestamp, byteBuffer); observationSizeSanityCheck(minExpectedSpaceAfterObservation, captureFieldNumber); return; } while(byteBuffer.position() < byteBuffer.limit()) { - int availableCOSSpace = getOrCreateCodedOutputStream().spaceLeft(); + // COS checked for unbounded limit above + int availableCOSSpace = getOrCreateCodedOutputStreamHolder().getOutputStreamSpaceLeft(); int chunkBytes = messageAndOverheadBytesLeft > availableCOSSpace ? availableCOSSpace - trafficStreamOverhead : byteBuffer.limit() - byteBuffer.position(); ByteBuffer bb = byteBuffer.slice(); bb.limit(chunkBytes); @@ -323,7 +329,7 @@ private void addSubstreamMessage(int captureFieldNumber, int dataFieldNumber, in int dataSize = 0; int segmentCountSize = 0; int captureClosureLength = 1; - CodedOutputStream codedOutputStream = getOrCreateCodedOutputStream(); + CodedOutputStream codedOutputStream = getOrCreateCodedOutputStreamHolder().getOutputStream(); if (dataCountFieldNumber > 0) { segmentCountSize = CodedOutputStream.computeInt32Size(dataCountFieldNumber, dataCount); } @@ -444,19 +450,19 @@ private void writeEndOfHttpMessage(Instant timestamp) throws IOException { beginSubstreamObservation(timestamp, TrafficObservation.ENDOFMESSAGEINDICATOR_FIELD_NUMBER, eomDataSize); // e.g. 15 { writeObservationTag(TrafficObservation.ENDOFMESSAGEINDICATOR_FIELD_NUMBER); - getOrCreateCodedOutputStream().writeUInt32NoTag(eomPairSize); - getOrCreateCodedOutputStream().writeInt32(EndOfMessageIndication.FIRSTLINEBYTELENGTH_FIELD_NUMBER, firstLineByteLength); - getOrCreateCodedOutputStream().writeInt32(EndOfMessageIndication.HEADERSBYTELENGTH_FIELD_NUMBER, headersByteLength); + getOrCreateCodedOutputStreamHolder().getOutputStream().writeUInt32NoTag(eomPairSize); + getOrCreateCodedOutputStreamHolder().getOutputStream().writeInt32(EndOfMessageIndication.FIRSTLINEBYTELENGTH_FIELD_NUMBER, firstLineByteLength); + getOrCreateCodedOutputStreamHolder().getOutputStream().writeInt32(EndOfMessageIndication.HEADERSBYTELENGTH_FIELD_NUMBER, headersByteLength); } private void writeEndOfSegmentMessage(Instant timestamp) throws IOException { beginSubstreamObservation(timestamp, TrafficObservation.SEGMENTEND_FIELD_NUMBER, 1); - getOrCreateCodedOutputStream().writeMessage(TrafficObservation.SEGMENTEND_FIELD_NUMBER, EndOfSegmentsIndication.getDefaultInstance()); + getOrCreateCodedOutputStreamHolder().getOutputStream().writeMessage(TrafficObservation.SEGMENTEND_FIELD_NUMBER, EndOfSegmentsIndication.getDefaultInstance()); } private void observationSizeSanityCheck(int minExpectedSpaceAfterObservation, int fieldNumber) throws IOException { - int actualRemainingSpace = getOrCreateCodedOutputStream().spaceLeft(); - if (actualRemainingSpace < minExpectedSpaceAfterObservation || minExpectedSpaceAfterObservation < 0) { + int actualRemainingSpace = getOrCreateCodedOutputStreamHolder().getOutputStreamSpaceLeft(); + if (actualRemainingSpace != -1 && (actualRemainingSpace < minExpectedSpaceAfterObservation || minExpectedSpaceAfterObservation < 0)) { log.warn("Writing a substream (capture type: {}) for Traffic Stream: {} left {} bytes in the CodedOutputStream but we calculated " + "at least {} bytes remaining, this should be investigated", fieldNumber, connectionIdString + "." + (numFlushesSoFar + 1), actualRemainingSpace, minExpectedSpaceAfterObservation); diff --git a/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java b/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java index 88ee5171d..b604338cd 100644 --- a/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java +++ b/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java @@ -3,9 +3,11 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import com.google.protobuf.ByteString; +import com.google.protobuf.CodedOutputStream; import com.google.protobuf.Timestamp; import io.netty.buffer.Unpooled; import java.io.IOException; +import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.time.Instant; @@ -18,9 +20,11 @@ import java.util.concurrent.ExecutionException; import lombok.AllArgsConstructor; import lombok.Lombok; +import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.opensearch.migrations.trafficcapture.StreamChannelConnectionCaptureSerializerTest.StreamManager.NullStreamManager; import org.opensearch.migrations.trafficcapture.protos.CloseObservation; import org.opensearch.migrations.trafficcapture.protos.ConnectionExceptionObservation; import org.opensearch.migrations.trafficcapture.protos.EndOfMessageIndication; @@ -28,7 +32,6 @@ import org.opensearch.migrations.trafficcapture.protos.ReadObservation; import org.opensearch.migrations.trafficcapture.protos.TrafficObservation; import org.opensearch.migrations.trafficcapture.protos.TrafficStream; - import org.opensearch.migrations.trafficcapture.protos.WriteObservation; @Slf4j @@ -187,6 +190,22 @@ public void testEmptyPacketIsHandledForSmallCodedOutputStream() Assertions.assertEquals(0, reconstitutedTrafficStream.getSubStream(1).getWrite().getData().size()); } + @Test + public void testWithLimitlessCodedOutputStreamHolder() + throws IOException, ExecutionException, InterruptedException { + + var serializer = new StreamChannelConnectionCaptureSerializer<>(TEST_NODE_ID_STRING, + TEST_TRAFFIC_STREAM_ID_STRING, + new NullStreamManager()); + + var bb = Unpooled.buffer(0); + serializer.addWriteEvent(REFERENCE_TIMESTAMP, bb); + serializer.addWriteEvent(REFERENCE_TIMESTAMP, bb); + var future = serializer.flushCommitAndResetStream(true); + future.get(); + bb.release(); + } + @Test public void testThatReadCanBeDeserialized() throws IOException, ExecutionException, InterruptedException { // these are only here as a debugging aid @@ -360,5 +379,33 @@ protected CompletableFuture kickoffCloseStream(CodedOutputStreamHolder out } }).thenApply(x -> null); } + + static class NullStreamManager implements StreamLifecycleManager { + + @Override + public CodedOutputStreamHolder createStream() { + return new CodedOutputStreamHolder() { + final CodedOutputStream nullOutputStream = CodedOutputStream.newInstance( + OutputStream.nullOutputStream()); + + @Override + public int getOutputStreamBytesLimit() { + return -1; + } + + @Override + public @NonNull CodedOutputStream getOutputStream() { + return nullOutputStream; + } + }; + } + + @Override + public CompletableFuture closeStream(CodedOutputStreamHolder outputStreamHolder, + int index) { + return CompletableFuture.completedFuture(null); + } + + } } } \ No newline at end of file diff --git a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/CaptureProxy.java b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/CaptureProxy.java index 6c8afd3e0..64aa22008 100644 --- a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/CaptureProxy.java +++ b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/CaptureProxy.java @@ -7,6 +7,7 @@ import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import java.io.OutputStream; import lombok.Lombok; import lombok.NonNull; import lombok.SneakyThrows; @@ -15,7 +16,6 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.SaslConfigs; -import org.apache.logging.log4j.core.util.NullOutputStream; import org.opensearch.common.settings.Settings; import org.opensearch.migrations.tracing.RootOtelContext; import org.opensearch.migrations.trafficcapture.CodedOutputStreamHolder; @@ -190,7 +190,19 @@ protected static IConnectionCaptureFactory getNullConnectionCaptureFacto new StreamLifecycleManager<>() { @Override public CodedOutputStreamHolder createStream() { - return () -> CodedOutputStream.newInstance(NullOutputStream.getInstance()); + return new CodedOutputStreamHolder() { + final CodedOutputStream nullOutputStream = CodedOutputStream.newInstance(OutputStream.nullOutputStream()); + + @Override + public int getOutputStreamBytesLimit() { + return -1; + } + + @Override + public @NonNull CodedOutputStream getOutputStream() { + return nullOutputStream; + } + }; } @Override diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/CaptureProxyConfigurationTest.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/CaptureProxyConfigurationTest.java new file mode 100644 index 000000000..d070de0cf --- /dev/null +++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/CaptureProxyConfigurationTest.java @@ -0,0 +1,78 @@ +package org.opensearch.migrations.trafficcapture.proxyserver; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.io.IOException; +import java.net.URI; +import java.time.Duration; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.opensearch.migrations.testutils.SimpleHttpClientForTesting; +import org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.CaptureProxyContainer; +import org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.HttpdContainerTestBase; +import org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.annotations.HttpdContainerTest; + +@Slf4j +@HttpdContainerTest +public class CaptureProxyConfigurationTest { + + private static final HttpdContainerTestBase httpdTestBase = new HttpdContainerTestBase(); + private static final String HTTPD_GET_EXPECTED_RESPONSE = "

It works!

\n"; + private static final int DEFAULT_NUMBER_OF_CALLS = 3; + private static final long PROXY_EXPECTED_MAX_LATENCY_MS = Duration.ofSeconds(1).toMillis(); + + @BeforeAll + public static void setUp() { + httpdTestBase.start(); + } + + @AfterAll + public static void tearDown() { + httpdTestBase.stop(); + } + + private static void assertLessThan(long ceiling, long actual) { + Assertions.assertTrue(actual < ceiling, + () -> "Expected actual value to be less than " + ceiling + " but was " + actual + "."); + } + + @Test + public void testCaptureProxyWithNoCapturePassesRequest() { + try (var captureProxy = new CaptureProxyContainer(httpdTestBase.getContainer())) { + captureProxy.start(); + + var latency = assertBasicCalls(captureProxy, DEFAULT_NUMBER_OF_CALLS); + + assertLessThan(PROXY_EXPECTED_MAX_LATENCY_MS, latency.toMillis()); + } + } + + private Duration assertBasicCalls(CaptureProxyContainer proxy, int numberOfCalls) { + return assertBasicCalls(CaptureProxyContainer.getUriFromContainer(proxy), numberOfCalls); + } + + private Duration assertBasicCalls(String endpoint, int numberOfCalls) { + return IntStream.range(0, numberOfCalls).mapToObj(i -> assertBasicCall(endpoint)) + .reduce(Duration.ZERO, Duration::plus).dividedBy(numberOfCalls); + } + + + private Duration assertBasicCall(String endpoint) { + try (var client = new SimpleHttpClientForTesting()) { + long startTimeNanos = System.nanoTime(); + var response = client.makeGetRequest(URI.create(endpoint), Stream.empty()); + long endTimeNanos = System.nanoTime(); + + var responseBody = new String(response.payloadBytes); + assertEquals(HTTPD_GET_EXPECTED_RESPONSE, responseBody); + return Duration.ofNanos(endTimeNanos - startTimeNanos); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/CaptureProxyContainer.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/CaptureProxyContainer.java index ed9f44d67..6269b071c 100644 --- a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/CaptureProxyContainer.java +++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/CaptureProxyContainer.java @@ -2,6 +2,7 @@ import com.github.dockerjava.api.command.InspectContainerResponse; import java.time.Duration; +import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.function.Supplier; @@ -42,6 +43,10 @@ public CaptureProxyContainer(final Container destination, final KafkaContaine this(() -> getUriFromContainer(destination), () -> getUriFromContainer(kafka)); } + public CaptureProxyContainer(final Container destination) { + this(() -> getUriFromContainer(destination), null); + } + public static String getUriFromContainer(final Container container) { return "http://" + container.getHost() + ":" + container.getFirstMappedPort(); } @@ -51,14 +56,22 @@ public void start() { this.listeningPort = PortFinder.findOpenPort(); serverThread = new Thread(() -> { try { - String[] args = { - "--kafkaConnection", kafkaUriSupplier.get(), - "--destinationUri", destinationUriSupplier.get(), - "--listenPort", String.valueOf(listeningPort), - "--insecureDestination" - }; - - CaptureProxy.main(args); + List argsList = new ArrayList<>(); + + if (kafkaUriSupplier != null) { + argsList.add("--kafkaConnection"); + argsList.add(kafkaUriSupplier.get()); + } else { + argsList.add("--noCapture"); + } + + argsList.add("--destinationUri"); + argsList.add(destinationUriSupplier.get()); + argsList.add("--listenPort"); + argsList.add(String.valueOf(listeningPort)); + argsList.add("--insecureDestination"); + + CaptureProxy.main(argsList.toArray(new String[0])); } catch (Exception e) { throw new AssertionError("Should not have exception", e); } From 6e490f7c84b97701bd68c5bb122ea645209b770e Mon Sep 17 00:00:00 2001 From: Andre Kurait Date: Fri, 29 Mar 2024 10:13:33 -0500 Subject: [PATCH 2/3] Address PR Comments Signed-off-by: Andre Kurait --- ...eamChannelConnectionCaptureSerializer.java | 44 ++++++++++--------- 1 file changed, 24 insertions(+), 20 deletions(-) diff --git a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java index 8b56e4646..792eb0400 100644 --- a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java +++ b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java @@ -92,6 +92,10 @@ private static int getWireTypeForFieldIndex(Descriptors.Descriptor d, int fieldN return d.findFieldByNumber(fieldNumber).getLiteType().getWireType(); } + private CodedOutputStream getOrCreateCodedOutputStream() throws IOException { + return getOrCreateCodedOutputStreamHolder().getOutputStream(); + } + private CodedOutputStreamHolder getOrCreateCodedOutputStreamHolder() throws IOException { if (streamHasBeenClosed) { // In an abundance of caution, flip the state back to basically act like a whole new @@ -137,12 +141,12 @@ public CompletableFuture flushIfNeeded(Supplier requiredSize) throws private void writeTrafficStreamTag(int fieldNumber) throws IOException { - getOrCreateCodedOutputStreamHolder().getOutputStream().writeTag(fieldNumber, + getOrCreateCodedOutputStream().writeTag(fieldNumber, getWireTypeForFieldIndex(TrafficStream.getDescriptor(), fieldNumber)); } private void writeObservationTag(int fieldNumber) throws IOException { - getOrCreateCodedOutputStreamHolder().getOutputStream().writeTag(fieldNumber, + getOrCreateCodedOutputStream().writeTag(fieldNumber, getWireTypeForFieldIndex(TrafficObservation.getDescriptor(), fieldNumber)); } @@ -161,35 +165,35 @@ private void beginSubstreamObservation(Instant timestamp, int captureTagFieldNum // e.g. 2 { writeTrafficStreamTag(TrafficStream.SUBSTREAM_FIELD_NUMBER); // Write observation content length - getOrCreateCodedOutputStreamHolder().getOutputStream().writeUInt32NoTag(observationContentSize); + getOrCreateCodedOutputStream().writeUInt32NoTag(observationContentSize); // e.g. 1 { 1: 1234 2: 1234 } writeTimestampForNowToCurrentStream(timestamp); } private void writeTimestampForNowToCurrentStream(Instant timestamp) throws IOException { writeObservationTag(TrafficObservation.TS_FIELD_NUMBER); - getOrCreateCodedOutputStreamHolder().getOutputStream().writeUInt32NoTag(CodedOutputStreamSizeUtil.getSizeOfTimestamp(timestamp)); + getOrCreateCodedOutputStream().writeUInt32NoTag(CodedOutputStreamSizeUtil.getSizeOfTimestamp(timestamp)); - getOrCreateCodedOutputStreamHolder().getOutputStream().writeInt64(Timestamp.SECONDS_FIELD_NUMBER, timestamp.getEpochSecond()); + getOrCreateCodedOutputStream().writeInt64(Timestamp.SECONDS_FIELD_NUMBER, timestamp.getEpochSecond()); if (timestamp.getNano() != 0) { - getOrCreateCodedOutputStreamHolder().getOutputStream().writeInt32(Timestamp.NANOS_FIELD_NUMBER, timestamp.getNano()); + getOrCreateCodedOutputStream().writeInt32(Timestamp.NANOS_FIELD_NUMBER, timestamp.getNano()); } } private void writeByteBufferToCurrentStream(int fieldNum, ByteBuffer byteBuffer) throws IOException { if (byteBuffer.remaining() > 0) { - getOrCreateCodedOutputStreamHolder().getOutputStream().writeByteBuffer(fieldNum, byteBuffer); + getOrCreateCodedOutputStream().writeByteBuffer(fieldNum, byteBuffer); } else { - getOrCreateCodedOutputStreamHolder().getOutputStream().writeUInt32NoTag(0); + getOrCreateCodedOutputStream().writeUInt32NoTag(0); } } private void writeByteStringToCurrentStream(int fieldNum, String str) throws IOException { if (str.length() > 0) { - getOrCreateCodedOutputStreamHolder().getOutputStream().writeString(fieldNum, str); + getOrCreateCodedOutputStream().writeString(fieldNum, str); } else { - getOrCreateCodedOutputStreamHolder().getOutputStream().writeUInt32NoTag(0); + getOrCreateCodedOutputStream().writeUInt32NoTag(0); } } @@ -199,7 +203,7 @@ public CompletableFuture flushCommitAndResetStream(boolean isFinal) throws IO return CompletableFuture.completedFuture(null); } try { - CodedOutputStream currentStream = getOrCreateCodedOutputStreamHolder().getOutputStream(); + CodedOutputStream currentStream = getOrCreateCodedOutputStream(); var fieldNum = isFinal ? TrafficStream.NUMBEROFTHISLASTCHUNK_FIELD_NUMBER : TrafficStream.NUMBER_FIELD_NUMBER; // e.g. 3: 1 currentStream.writeInt32(fieldNum, ++numFlushesSoFar); @@ -219,7 +223,7 @@ public CompletableFuture flushCommitAndResetStream(boolean isFinal) throws IO @Override public void cancelCaptureForCurrentRequest(Instant timestamp) throws IOException { beginSubstreamObservation(timestamp, TrafficObservation.REQUESTDROPPED_FIELD_NUMBER, 1); - getOrCreateCodedOutputStreamHolder().getOutputStream().writeMessage(TrafficObservation.REQUESTDROPPED_FIELD_NUMBER, + getOrCreateCodedOutputStream().writeMessage(TrafficObservation.REQUESTDROPPED_FIELD_NUMBER, RequestIntentionallyDropped.getDefaultInstance()); this.readObservationsAreWaitingForEom = false; this.firstLineByteLength = -1; @@ -244,7 +248,7 @@ public void addDisconnectEvent(Instant timestamp) throws IOException { @Override public void addCloseEvent(Instant timestamp) throws IOException { beginSubstreamObservation(timestamp, TrafficObservation.CLOSE_FIELD_NUMBER, 1); - getOrCreateCodedOutputStreamHolder().getOutputStream().writeMessage(TrafficObservation.CLOSE_FIELD_NUMBER, + getOrCreateCodedOutputStream().writeMessage(TrafficObservation.CLOSE_FIELD_NUMBER, CloseObservation.getDefaultInstance()); } @@ -265,7 +269,7 @@ private void addStringMessage(int captureFieldNumber, int dataFieldNumber, // e.g. 4 { writeObservationTag(captureFieldNumber); if (dataSize > 0) { - getOrCreateCodedOutputStreamHolder().getOutputStream().writeInt32NoTag(dataSize); + getOrCreateCodedOutputStream().writeInt32NoTag(dataSize); } writeByteStringToCurrentStream(dataFieldNumber, str); } @@ -292,7 +296,7 @@ private void addDataMessage(int captureFieldNumber, int dataFieldNumber, Instant int trafficStreamOverhead = messageAndOverheadBytesLeft - byteBuffer.capacity(); // Ensure that space for at least one data byte and overhead exists, otherwise a flush is necessary. - flushIfNeeded(() -> trafficStreamOverhead); + flushIfNeeded(() -> (trafficStreamOverhead + 1)); // If our message is empty or can fit in the current CodedOutputStream no chunking is needed, and we can continue var spaceLeft = getOrCreateCodedOutputStreamHolder().getOutputStreamSpaceLeft(); @@ -329,7 +333,7 @@ private void addSubstreamMessage(int captureFieldNumber, int dataFieldNumber, in int dataSize = 0; int segmentCountSize = 0; int captureClosureLength = 1; - CodedOutputStream codedOutputStream = getOrCreateCodedOutputStreamHolder().getOutputStream(); + CodedOutputStream codedOutputStream = getOrCreateCodedOutputStream(); if (dataCountFieldNumber > 0) { segmentCountSize = CodedOutputStream.computeInt32Size(dataCountFieldNumber, dataCount); } @@ -450,14 +454,14 @@ private void writeEndOfHttpMessage(Instant timestamp) throws IOException { beginSubstreamObservation(timestamp, TrafficObservation.ENDOFMESSAGEINDICATOR_FIELD_NUMBER, eomDataSize); // e.g. 15 { writeObservationTag(TrafficObservation.ENDOFMESSAGEINDICATOR_FIELD_NUMBER); - getOrCreateCodedOutputStreamHolder().getOutputStream().writeUInt32NoTag(eomPairSize); - getOrCreateCodedOutputStreamHolder().getOutputStream().writeInt32(EndOfMessageIndication.FIRSTLINEBYTELENGTH_FIELD_NUMBER, firstLineByteLength); - getOrCreateCodedOutputStreamHolder().getOutputStream().writeInt32(EndOfMessageIndication.HEADERSBYTELENGTH_FIELD_NUMBER, headersByteLength); + getOrCreateCodedOutputStream().writeUInt32NoTag(eomPairSize); + getOrCreateCodedOutputStream().writeInt32(EndOfMessageIndication.FIRSTLINEBYTELENGTH_FIELD_NUMBER, firstLineByteLength); + getOrCreateCodedOutputStream().writeInt32(EndOfMessageIndication.HEADERSBYTELENGTH_FIELD_NUMBER, headersByteLength); } private void writeEndOfSegmentMessage(Instant timestamp) throws IOException { beginSubstreamObservation(timestamp, TrafficObservation.SEGMENTEND_FIELD_NUMBER, 1); - getOrCreateCodedOutputStreamHolder().getOutputStream().writeMessage(TrafficObservation.SEGMENTEND_FIELD_NUMBER, EndOfSegmentsIndication.getDefaultInstance()); + getOrCreateCodedOutputStream().writeMessage(TrafficObservation.SEGMENTEND_FIELD_NUMBER, EndOfSegmentsIndication.getDefaultInstance()); } private void observationSizeSanityCheck(int minExpectedSpaceAfterObservation, int fieldNumber) throws IOException { From 5d0538447816c6b1e21b0fdc2374652e161e95e9 Mon Sep 17 00:00:00 2001 From: Andre Kurait Date: Fri, 29 Mar 2024 13:53:51 -0500 Subject: [PATCH 3/3] Add test to confirm buffer limits respected in StreamChannelConnectionCaptureSerializerTest Signed-off-by: Andre Kurait --- ...mChannelConnectionCaptureSerializerTest.java | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java b/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java index b604338cd..b2bd5d54a 100644 --- a/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java +++ b/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java @@ -83,12 +83,18 @@ private static int getIndexForTrafficStream(TrafficStream s) { @Test public void testLargeReadPacketIsSplit() throws IOException, ExecutionException, InterruptedException { + var bufferSize = 1024 * 1024; var outputBuffersCreated = new ConcurrentLinkedQueue(); - var serializer = createSerializerWithTestHandler(outputBuffersCreated, 1024 * 1024); + var serializer = createSerializerWithTestHandler(outputBuffersCreated, bufferSize); + + double minGeneratedChunks = 2.2; + int dataRepeat = (int) Math.ceil((((double) bufferSize / FAKE_READ_PACKET_DATA.length()) * minGeneratedChunks)); - // Create over 1MB packet - String data = FAKE_READ_PACKET_DATA.repeat((1024 * 1024 / FAKE_READ_PACKET_DATA.length()) + 1); + String data = FAKE_READ_PACKET_DATA.repeat(dataRepeat); byte[] fakeDataBytes = data.getBytes(StandardCharsets.UTF_8); + + int expectedChunks = (fakeDataBytes.length / bufferSize) + ((fakeDataBytes.length % bufferSize == 0) ? 0 : 1); + var bb = Unpooled.wrappedBuffer(fakeDataBytes); serializer.addReadEvent(REFERENCE_TIMESTAMP, bb); var future = serializer.flushCommitAndResetStream(true); @@ -98,17 +104,18 @@ public void testLargeReadPacketIsSplit() throws IOException, ExecutionException, var outputBuffersList = new ArrayList<>(outputBuffersCreated); var reconstitutedTrafficStreamsList = new ArrayList(); - for (int i = 0; i < 2; ++i) { + for (int i = 0; i < expectedChunks; ++i) { reconstitutedTrafficStreamsList.add(TrafficStream.parseFrom(outputBuffersList.get(i))); } reconstitutedTrafficStreamsList.sort( Comparator.comparingInt(StreamChannelConnectionCaptureSerializerTest::getIndexForTrafficStream)); int totalSize = 0; - for (int i = 0; i < 2; ++i) { + for (int i = 0; i < expectedChunks; ++i) { var reconstitutedTrafficStream = reconstitutedTrafficStreamsList.get(i); int dataSize = reconstitutedTrafficStream.getSubStream(0).getReadSegment().getData().size(); totalSize += dataSize; Assertions.assertEquals(i + 1, getIndexForTrafficStream(reconstitutedTrafficStream)); + Assertions.assertTrue(dataSize <= bufferSize); } Assertions.assertEquals(fakeDataBytes.length, totalSize); }