diff --git a/TrafficCapture/captureOffloader/build.gradle b/TrafficCapture/captureOffloader/build.gradle index cdc0350c6..c17af9da4 100644 --- a/TrafficCapture/captureOffloader/build.gradle +++ b/TrafficCapture/captureOffloader/build.gradle @@ -28,6 +28,7 @@ dependencies { implementation group: 'org.projectlombok', name: 'lombok', version: '1.18.26' implementation group: 'org.slf4j', name: 'slf4j-api', version: '2.0.7' + testImplementation testFixtures(project(path: ':testUtilities')) testImplementation project(':coreUtilities') testImplementation group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.20.0' testImplementation group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.20.0' diff --git a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/CodedOutputStreamSizeUtil.java b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/CodedOutputStreamSizeUtil.java index e6335a238..50fdcf103 100644 --- a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/CodedOutputStreamSizeUtil.java +++ b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/CodedOutputStreamSizeUtil.java @@ -2,10 +2,10 @@ import com.google.protobuf.CodedOutputStream; import com.google.protobuf.Timestamp; +import io.netty.buffer.ByteBuf; import org.opensearch.migrations.trafficcapture.protos.TrafficObservation; import org.opensearch.migrations.trafficcapture.protos.TrafficStream; -import java.nio.ByteBuffer; import java.time.Instant; /** @@ -28,18 +28,18 @@ public static int getSizeOfTimestamp(Instant t) { /** * This function calculates the maximum bytes that would be needed to store a [Read/Write]SegmentObservation, if constructed - * from the given ByteBuffer and associated segment field numbers and values passed in. This estimate is essentially - * the max size needed in the CodedOutputStream to store the provided ByteBuffer data and its associated TrafficStream + * from the given ByteBuf and associated segment field numbers and values passed in. This estimate is essentially + * the max size needed in the CodedOutputStream to store the provided ByteBuf data and its associated TrafficStream * overhead. The actual required bytes could be marginally smaller. */ public static int maxBytesNeededForASegmentedObservation(Instant timestamp, int observationFieldNumber, - int dataFieldNumber, ByteBuffer buffer) { + int dataFieldNumber, ByteBuf buf) { // Timestamp required bytes int tsContentSize = getSizeOfTimestamp(timestamp); int tsTagAndContentSize = CodedOutputStream.computeInt32Size(TrafficObservation.TS_FIELD_NUMBER, tsContentSize) + tsContentSize; // Capture required bytes - int dataSize = CodedOutputStream.computeByteBufferSize(dataFieldNumber, buffer); + int dataSize = computeByteBufRemainingSize(dataFieldNumber, buf); int captureTagAndContentSize = CodedOutputStream.computeInt32Size(observationFieldNumber, dataSize) + dataSize; // Observation and closing index required bytes @@ -47,6 +47,22 @@ public static int maxBytesNeededForASegmentedObservation(Instant timestamp, int Integer.MAX_VALUE); } + /** + * This function determines the number of bytes needed to write the readable bytes in a byteBuf and its tag. + */ + public static int computeByteBufRemainingSize(int fieldNumber, ByteBuf buf) { + return CodedOutputStream.computeTagSize(fieldNumber) + computeByteBufRemainingSizeNoTag(buf); + } + + /** + * This function determines the number of bytes needed to write the readable bytes in a byteBuf. + */ + public static int computeByteBufRemainingSizeNoTag(ByteBuf buf) { + int bufSize = buf.readableBytes(); + return CodedOutputStream.computeUInt32SizeNoTag(bufSize) + bufSize; + } + + /** * This function determines the number of bytes needed to store a TrafficObservation and a closing index for a * TrafficStream, from the provided input. diff --git a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java index d8c7804d6..345c31f1d 100644 --- a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java +++ b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java @@ -1,12 +1,15 @@ package org.opensearch.migrations.trafficcapture; +import com.google.protobuf.ByteOutput; import com.google.protobuf.CodedOutputStream; import com.google.protobuf.Descriptors; import com.google.protobuf.Timestamp; +import com.google.protobuf.WireFormat; import io.netty.buffer.ByteBuf; -import java.util.function.IntSupplier; -import java.util.function.Supplier; +import java.io.UncheckedIOException; +import java.nio.channels.GatheringByteChannel; +import java.util.stream.IntStream; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import org.opensearch.migrations.trafficcapture.protos.CloseObservation; @@ -133,9 +136,29 @@ private CodedOutputStreamHolder getOrCreateCodedOutputStreamHolder() throws IOEx } } - public CompletableFuture flushIfNeeded(IntSupplier requiredSize) throws IOException { - var spaceLeft = getOrCreateCodedOutputStreamHolder().getOutputStreamSpaceLeft(); - if (spaceLeft != -1 && spaceLeft < requiredSize.getAsInt()) { + public int currentOutputStreamWriteableSpaceLeft() throws IOException { + // Writeable bytes is the space left minus the space needed to complete the next flush + var maxFieldTagNumberToBeWrittenUponStreamFlush = Math.max(TrafficStream.NUMBEROFTHISLASTCHUNK_FIELD_NUMBER, TrafficStream.NUMBER_FIELD_NUMBER); + var spaceNeededForRecordCreationDuringNextFlush = CodedOutputStream.computeInt32Size(maxFieldTagNumberToBeWrittenUponStreamFlush, numFlushesSoFar + 1); + var outputStreamSpaceLeft = getOrCreateCodedOutputStreamHolder().getOutputStreamSpaceLeft(); + return outputStreamSpaceLeft == -1 ? -1 : outputStreamSpaceLeft - spaceNeededForRecordCreationDuringNextFlush; + } + + /** + * Checks if the current output stream has enough space for the required size and flushes if not. + * This method evaluates the writable space left in the current stream. If the space is insufficient + * for the required size, it triggers a flush operation by calling {@link #flushCommitAndResetStream(boolean)} + * with 'false' to indicate this is not a final operation. If there is adequate space, + * it returns a completed future with null. + * + * @param requiredSize The size required to write to the current stream. + * @return CompletableFuture A future that completes immediately with null if there is enough space, + * or completes with the future returned by flushCommitAndResetStream if a flush is needed. + * @throws IOException if there are I/O errors when checking the stream's space or flushing. + */ + public CompletableFuture flushIfNeeded(int requiredSize) throws IOException { + var spaceLeft = currentOutputStreamWriteableSpaceLeft(); + if (spaceLeft != -1 && spaceLeft < requiredSize) { return flushCommitAndResetStream(false); } return CompletableFuture.completedFuture(null); @@ -163,7 +186,7 @@ private void beginSubstreamObservation(Instant timestamp, int captureTagFieldNum final var captureTagNoLengthSize = CodedOutputStream.computeTagSize(captureTagFieldNumber); final var observationContentSize = tsTagSize + tsContentSize + captureTagNoLengthSize + captureTagLengthAndContentSize; // Ensure space is available before starting an observation - flushIfNeeded(() -> CodedOutputStreamSizeUtil.bytesNeededForObservationAndClosingIndex(observationContentSize, numFlushesSoFar + 1)); + flushIfNeeded(CodedOutputStreamSizeUtil.bytesNeededForObservationAndClosingIndex(observationContentSize, numFlushesSoFar + 1)); // e.g. 2 { writeTrafficStreamTag(TrafficStream.SUBSTREAM_FIELD_NUMBER); // Write observation content length @@ -182,11 +205,44 @@ private void writeTimestampForNowToCurrentStream(Instant timestamp) throws IOExc } } - private void writeByteBufferToCurrentStream(int fieldNum, ByteBuffer byteBuffer) throws IOException { - if (byteBuffer.remaining() > 0) { - getOrCreateCodedOutputStream().writeByteBuffer(fieldNum, byteBuffer); + /** + * Computes the maximum number of writable bytes for a length-delimited field within a given total available space. + * This method takes into account the space required for encoding the length of the field itself, which might reduce + * the writable space due to the encoding overhead. + * + * @param totalAvailableSpace The total available space in bytes that can be used for both the length field and the data. + * @param requestedFieldSize The desired number of writable bytes the caller wishes to use for the data, excluding the length field. + * @return The maximum number of bytes that can be written as data, which may be less than the requestedWritableSpace due to space + * taken by the length field and totalAvailableSpace. If a length delimited field is written of the returned size, and the returned + * length is less than requestedFieldSize, then there will be at most one byte of available space remaining. In some cases + * this byte is due to overestimation of lengthFieldSpace in which an optimal calculation would have returned one length + * greater, and in other cases it is due to the length returned being at the border of an increase in the lengthFieldSpace + * and thus an additional space on the return value would require two additional space to write. + */ + public static int computeMaxLengthDelimitedFieldSizeForSpace(int totalAvailableSpace, int requestedFieldSize) { + // A pessimistic calculation space required for the length field due to not accounting for the space of the length field itself. + // This may yield a length space one byte greater than optimal, potentially leaving at most one length delimited byte + // which the availableSpace has space for. + final int pessimisticLengthFieldSpace = CodedOutputStream.computeUInt32SizeNoTag(totalAvailableSpace); + int maxWriteBytesSpace = totalAvailableSpace - pessimisticLengthFieldSpace; + return Math.min(maxWriteBytesSpace, requestedFieldSize); + } + private void readByteBufIntoCurrentStream(int fieldNum, ByteBuf buf) throws IOException { + var codedOutputStream = getOrCreateCodedOutputStream(); + final int bufReadableLength = buf.readableBytes(); + if (bufReadableLength > 0) { + // Here we are optimizing to reduce the number of internal copies and merges performed on the netty + // ByteBuf to write to the CodedOutputStream especially in cases of Composite and Direct ByteBufs. We + // can do this by delegating the individual ByteBuffer writes to netty which will retain the underlying + // structure. We also need to be careful with the exact CodedOutputStream operations performed to ensure + // we write until hitting ByteBuf/ByteBuffer writerIndex/limit and not their capacity since some + // CodedOutputStream operations write until capacity, e.g. CodedOutputStream::writeByteBuffer + codedOutputStream.writeTag(fieldNum, WireFormat.WIRETYPE_LENGTH_DELIMITED); + codedOutputStream.writeUInt32NoTag(bufReadableLength); + buf.readBytes(new ByteOutputGatheringByteChannel(codedOutputStream), bufReadableLength); + assert buf.readableBytes() == 0 : "Expected buf bytes read but instead left " + buf.readableBytes() + " unread."; } else { - getOrCreateCodedOutputStream().writeUInt32NoTag(0); + codedOutputStream.writeUInt32NoTag(0); } } @@ -199,6 +255,17 @@ private void writeByteStringToCurrentStream(int fieldNum, String str) throws IOE } } + /** + * Writes a record to the stream, flushes it, and begins its closure. This method synchronously sets up + * the closing process of the underlying stream and prepares the CodedOutputStreamHolder to return a new stream on next retrieval. + * Each invocation writes a record to signal the current state: the final chunk if 'isFinal' is true, + * otherwise a continuation. Returns a CompletableFuture that resolves upon the stream's closure. + * + * @param isFinal Indicates if this should be the final operation on the stream. + * @return CompletableFuture A future that completes when the stream is closed. Returns null if already closed or no stream exists and 'isFinal' is false. + * @throws IOException if there are I/O errors during the operation. + */ + @Override public CompletableFuture flushCommitAndResetStream(boolean isFinal) throws IOException { if (streamHasBeenClosed || (currentCodedOutputStreamHolderOrNull == null && !isFinal)) { @@ -276,8 +343,7 @@ private void addStringMessage(int captureFieldNumber, int dataFieldNumber, writeByteStringToCurrentStream(dataFieldNumber, str); } - private void addDataMessage(int captureFieldNumber, int dataFieldNumber, Instant timestamp, ByteBuf buffer) throws IOException { - var byteBuffer = buffer.nioBuffer(); + private void addDataMessage(int captureFieldNumber, int dataFieldNumber, Instant timestamp, ByteBuf buf) throws IOException { int segmentFieldNumber; int segmentDataFieldNumber; if (captureFieldNumber == TrafficObservation.READ_FIELD_NUMBER) { @@ -293,54 +359,53 @@ private void addDataMessage(int captureFieldNumber, int dataFieldNumber, Instant // the potentially required bytes for simplicity. This could leave ~5 bytes of unused space in the CodedOutputStream // when considering the case of a message that does not need segments or for the case of a smaller segment created // from a much larger message - int messageAndOverheadBytesLeft = CodedOutputStreamSizeUtil.maxBytesNeededForASegmentedObservation(timestamp, - segmentFieldNumber, segmentDataFieldNumber, byteBuffer); - int trafficStreamOverhead = messageAndOverheadBytesLeft - byteBuffer.capacity(); + final int messageAndOverheadBytesLeft = CodedOutputStreamSizeUtil.maxBytesNeededForASegmentedObservation(timestamp, + segmentFieldNumber, segmentDataFieldNumber, buf); + final int dataSize = CodedOutputStreamSizeUtil.computeByteBufRemainingSizeNoTag(buf); + final int trafficStreamOverhead = messageAndOverheadBytesLeft - dataSize; - // Ensure that space for at least one data byte and overhead exists, otherwise a flush is necessary. - flushIfNeeded(() -> (trafficStreamOverhead + 1)); + // Writing one data byte requires two bytes to account for length byte + final int maxBytesNeededForOneSegmentWithOneDataByteWithLengthByte = trafficStreamOverhead + 2; + flushIfNeeded(maxBytesNeededForOneSegmentWithOneDataByteWithLengthByte); + var spaceLeft = currentOutputStreamWriteableSpaceLeft(); + + var bufToRead = buf.duplicate(); // If our message is empty or can fit in the current CodedOutputStream no chunking is needed, and we can continue - var spaceLeft = getOrCreateCodedOutputStreamHolder().getOutputStreamSpaceLeft(); - if (byteBuffer.limit() == 0 || spaceLeft == -1 || messageAndOverheadBytesLeft <= spaceLeft) { + if (bufToRead.readableBytes() == 0 || spaceLeft == -1 || messageAndOverheadBytesLeft <= spaceLeft) { int minExpectedSpaceAfterObservation = spaceLeft - messageAndOverheadBytesLeft; - addSubstreamMessage(captureFieldNumber, dataFieldNumber, timestamp, byteBuffer); + addSubstreamMessage(captureFieldNumber, dataFieldNumber, timestamp, bufToRead); observationSizeSanityCheck(minExpectedSpaceAfterObservation, captureFieldNumber); - return; - } - - while(byteBuffer.position() < byteBuffer.limit()) { - // COS checked for unbounded limit above - int availableCOSSpace = getOrCreateCodedOutputStreamHolder().getOutputStreamSpaceLeft(); - int chunkBytes = messageAndOverheadBytesLeft > availableCOSSpace ? availableCOSSpace - trafficStreamOverhead : byteBuffer.limit() - byteBuffer.position(); - ByteBuffer bb = byteBuffer.slice(); - bb.limit(chunkBytes); - bb = bb.slice(); - byteBuffer.position(byteBuffer.position() + chunkBytes); - addSubstreamMessage(segmentFieldNumber, segmentDataFieldNumber, timestamp, bb); - int minExpectedSpaceAfterObservation = availableCOSSpace - chunkBytes - trafficStreamOverhead; - observationSizeSanityCheck(minExpectedSpaceAfterObservation, segmentFieldNumber); - // 1 to N-1 chunked messages - if (byteBuffer.position() < byteBuffer.limit()) { - flushCommitAndResetStream(false); - messageAndOverheadBytesLeft = messageAndOverheadBytesLeft - chunkBytes; + } else { + while(bufToRead.readableBytes() > 0) { + spaceLeft = currentOutputStreamWriteableSpaceLeft(); + var bytesToRead = computeMaxLengthDelimitedFieldSizeForSpace(spaceLeft - trafficStreamOverhead, bufToRead.readableBytes()); + if (bytesToRead <= 0) { + throw new IllegalStateException("Stream space is not allowing forward progress on byteBuf reading"); + } + var bufSliceToRead = bufToRead.readSlice(bytesToRead); + addSubstreamMessage(segmentFieldNumber, segmentDataFieldNumber, timestamp, bufSliceToRead); + if (bufToRead.readableBytes() > 0) { + flushIfNeeded(maxBytesNeededForOneSegmentWithOneDataByteWithLengthByte); + } } + writeEndOfSegmentMessage(timestamp); } - writeEndOfSegmentMessage(timestamp); - } private void addSubstreamMessage(int captureFieldNumber, int dataFieldNumber, int dataCountFieldNumber, int dataCount, - Instant timestamp, java.nio.ByteBuffer byteBuffer) throws IOException { - int dataSize = 0; + Instant timestamp, ByteBuf byteBuf) throws IOException { + int dataBytesSize = 0; + int dataTagSize = 0; + int dataSize = dataBytesSize + dataTagSize; int segmentCountSize = 0; int captureClosureLength = 1; CodedOutputStream codedOutputStream = getOrCreateCodedOutputStream(); if (dataCountFieldNumber > 0) { segmentCountSize = CodedOutputStream.computeInt32Size(dataCountFieldNumber, dataCount); } - if (byteBuffer.remaining() > 0) { - dataSize = CodedOutputStream.computeByteBufferSize(dataFieldNumber, byteBuffer); + if (byteBuf.readableBytes() > 0) { + dataSize = CodedOutputStreamSizeUtil.computeByteBufRemainingSize(dataFieldNumber, byteBuf); captureClosureLength = CodedOutputStream.computeInt32SizeNoTag(dataSize + segmentCountSize); } beginSubstreamObservation(timestamp, captureFieldNumber, captureClosureLength + dataSize + segmentCountSize); @@ -355,7 +420,7 @@ private void addSubstreamMessage(int captureFieldNumber, int dataFieldNumber, in codedOutputStream.writeInt32(dataCountFieldNumber, dataCount); } // Write data field - writeByteBufferToCurrentStream(dataFieldNumber, byteBuffer); + readByteBufIntoCurrentStream(dataFieldNumber, byteBuf); if (captureFieldNumber == TrafficObservation.READ_FIELD_NUMBER || captureFieldNumber == TrafficObservation.READSEGMENT_FIELD_NUMBER) { this.readObservationsAreWaitingForEom = true; @@ -363,8 +428,8 @@ private void addSubstreamMessage(int captureFieldNumber, int dataFieldNumber, in } private void addSubstreamMessage(int captureFieldNumber, int dataFieldNumber, Instant timestamp, - java.nio.ByteBuffer byteBuffer) throws IOException { - addSubstreamMessage(captureFieldNumber, dataFieldNumber, 0, 0, timestamp, byteBuffer); + ByteBuf byteBuf) throws IOException { + addSubstreamMessage(captureFieldNumber, dataFieldNumber, 0, 0, timestamp, byteBuf); } @Override @@ -467,11 +532,53 @@ private void writeEndOfSegmentMessage(Instant timestamp) throws IOException { } private void observationSizeSanityCheck(int minExpectedSpaceAfterObservation, int fieldNumber) throws IOException { - int actualRemainingSpace = getOrCreateCodedOutputStreamHolder().getOutputStreamSpaceLeft(); + int actualRemainingSpace = currentOutputStreamWriteableSpaceLeft(); if (actualRemainingSpace != -1 && (actualRemainingSpace < minExpectedSpaceAfterObservation || minExpectedSpaceAfterObservation < 0)) { log.warn("Writing a substream (capture type: {}) for Traffic Stream: {} left {} bytes in the CodedOutputStream but we calculated " + "at least {} bytes remaining, this should be investigated", fieldNumber, connectionIdString + "." + (numFlushesSoFar + 1), actualRemainingSpace, minExpectedSpaceAfterObservation); } } + + private static class ByteOutputGatheringByteChannel implements GatheringByteChannel { + final ByteOutput byteOutput; + + public ByteOutputGatheringByteChannel(ByteOutput byteOutput) { + this.byteOutput = byteOutput; + } + + @Override + public int write(ByteBuffer src) throws IOException { + var bytesToWrite = src.remaining(); + byteOutput.write(src); + return bytesToWrite - src.remaining(); + } + + @Override + public long write(ByteBuffer[] srcs, int offset, int length) throws UncheckedIOException { + return IntStream.range(offset, offset + length) + .mapToLong(i -> { + try { + return write(srcs[i]); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }) + .sum(); + } + + @Override + public long write(ByteBuffer[] srcs) throws IOException { + return write(srcs, 0, srcs.length); + } + + @Override + public boolean isOpen() { + return true; + } + + @Override + public void close() { + } + } } diff --git a/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/CodedOutputStreamSizeUtilTest.java b/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/CodedOutputStreamSizeUtilTest.java new file mode 100644 index 000000000..29198ef1e --- /dev/null +++ b/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/CodedOutputStreamSizeUtilTest.java @@ -0,0 +1,105 @@ +package org.opensearch.migrations.trafficcapture; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.nio.charset.StandardCharsets; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import java.time.Instant; + +class CodedOutputStreamSizeUtilTest { + + @Test + void testGetSizeOfTimestamp() { + // Timestamp with only seconds (no explicit nanoseconds) + Instant timestampSecondsOnly = Instant.parse("2024-01-01T00:00:00Z"); + int sizeSecondsOnly = CodedOutputStreamSizeUtil.getSizeOfTimestamp(timestampSecondsOnly); + Assertions.assertEquals( 6, sizeSecondsOnly); + + // Timestamp with both seconds and nanoseconds + Instant timestampWithNanos = Instant.parse("2024-12-31T23:59:59.123456789Z"); + int sizeWithNanos = CodedOutputStreamSizeUtil.getSizeOfTimestamp(timestampWithNanos); + Assertions.assertEquals( 11, sizeWithNanos); + } + + @Test + void testMaxBytesNeededForASegmentedObservation() { + Instant timestamp = Instant.parse("2024-01-01T00:00:00Z"); + ByteBuf buf = Unpooled.buffer(100); + buf.writeCharSequence("Test", StandardCharsets.UTF_8); + int result = CodedOutputStreamSizeUtil.maxBytesNeededForASegmentedObservation(timestamp, 1, 2, buf); + Assertions.assertEquals(24, result); + } + + @Test + void test_computeByteBufRemainingSize_emptyBufWithCapacity() { + var buf = Unpooled.directBuffer(100); + int result = CodedOutputStreamSizeUtil.computeByteBufRemainingSize(2, buf); + Assertions.assertEquals(2, result); + } + + @Test + void test_computeByteBufRemainingSize() { + var buf = Unpooled.directBuffer(); + buf.writeCharSequence("hello_test", StandardCharsets.UTF_8); + int result = CodedOutputStreamSizeUtil.computeByteBufRemainingSize(2, buf); + Assertions.assertEquals(12, result); + } + + @Test + void test_computeByteBufRemainingSize_largeBuf() { + var buf = Unpooled.directBuffer(); + buf.writeCharSequence("1234567890".repeat(100000), StandardCharsets.UTF_8); + int result = CodedOutputStreamSizeUtil.computeByteBufRemainingSize(2, buf); + Assertions.assertEquals(1000004, result); + } + + + @Test + void test_computeByteBufRemainingSize_ByteBufAtCapacity() { + ByteBuf buf = Unpooled.buffer(4); + buf.writeCharSequence("Test", StandardCharsets.UTF_8); + int result = CodedOutputStreamSizeUtil.computeByteBufRemainingSize(2, buf); + Assertions.assertEquals(6, result); + } + + @Test + void test_computeByteBufRemainingSize_EmptyByteBuf() { + ByteBuf buf = Unpooled.buffer(0, 0); + int result = CodedOutputStreamSizeUtil.computeByteBufRemainingSize(2, buf); + Assertions.assertEquals(2, result); + } + + @Test + void testBytesNeededForObservationAndClosingIndex() { + int observationContentSize = 50; + int numberOfTrafficStreamsSoFar = 10; + + int result = CodedOutputStreamSizeUtil.bytesNeededForObservationAndClosingIndex(observationContentSize, numberOfTrafficStreamsSoFar); + Assertions.assertEquals(54, result); + } + + @Test + void testBytesNeededForObservationAndClosingIndex_WithZeroContent() { + int observationContentSize = 0; + int numberOfTrafficStreamsSoFar = 0; + + int result = CodedOutputStreamSizeUtil.bytesNeededForObservationAndClosingIndex(observationContentSize, numberOfTrafficStreamsSoFar); + Assertions.assertEquals(4, result); + } + + @Test + void testBytesNeededForObservationAndClosingIndex_VariousIndices() { + int observationContentSize = 20; + + // Test with increasing indices to verify scaling of index size + int[] indices = new int[]{1, 1000, 100000}; + int[] expectedResults = new int[]{24, 25, 26}; + + for (int i = 0; i < indices.length; i++) { + int result = CodedOutputStreamSizeUtil.bytesNeededForObservationAndClosingIndex(observationContentSize, indices[i]); + Assertions.assertEquals(expectedResults[i], result); + } + } + +} diff --git a/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java b/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java index 4ea9f551b..83445d4c7 100644 --- a/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java +++ b/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java @@ -1,7 +1,5 @@ package org.opensearch.migrations.trafficcapture; -import static org.junit.jupiter.api.Assertions.assertThrows; - import com.google.protobuf.ByteString; import com.google.protobuf.CodedOutputStream; import com.google.protobuf.Timestamp; @@ -24,6 +22,10 @@ import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; +import org.opensearch.migrations.testutils.WrapWithNettyLeakDetection; import org.opensearch.migrations.trafficcapture.StreamChannelConnectionCaptureSerializerTest.StreamManager.NullStreamManager; import org.opensearch.migrations.trafficcapture.protos.CloseObservation; import org.opensearch.migrations.trafficcapture.protos.ConnectionExceptionObservation; @@ -33,8 +35,10 @@ import org.opensearch.migrations.trafficcapture.protos.TrafficObservation; import org.opensearch.migrations.trafficcapture.protos.TrafficStream; import org.opensearch.migrations.trafficcapture.protos.WriteObservation; +import org.opensearch.migrations.trafficcapture.protos.WriteSegmentObservation; @Slf4j +@WrapWithNettyLeakDetection(repetitions = 4) class StreamChannelConnectionCaptureSerializerTest { public static final String TEST_TRAFFIC_STREAM_ID_STRING = "Test"; @@ -150,6 +154,74 @@ public void testBasicDataConsistencyWhenChunking() throws IOException, Execution Assertions.assertEquals(packetData, reconstructedData.toString()); } + @Test + public void testWriteObservationWithSpaceForOneByteAtATime() throws IOException, ExecutionException, InterruptedException { + var packetData = FAKE_READ_PACKET_DATA.repeat(5); + byte[] packetBytes = packetData.getBytes(StandardCharsets.UTF_8); + var numberOfChunks = packetBytes.length; + + var outputBuffersCreated = new ConcurrentLinkedQueue(); + + var streamOverheadBytes = CodedOutputStream.computeStringSize(TrafficStream.CONNECTIONID_FIELD_NUMBER, TEST_TRAFFIC_STREAM_ID_STRING) + + CodedOutputStream.computeStringSize(TrafficStream.NODEID_FIELD_NUMBER, TEST_NODE_ID_STRING); + var spaceNeededForFlush = CodedOutputStream.computeInt32Size(TrafficStream.NUMBEROFTHISLASTCHUNK_FIELD_NUMBER, packetData.length()); + + var bufferSizeToGetDesiredChunks = CodedOutputStreamSizeUtil.maxBytesNeededForASegmentedObservation(REFERENCE_TIMESTAMP, + TrafficObservation.WRITESEGMENT_FIELD_NUMBER, + WriteSegmentObservation.DATA_FIELD_NUMBER, + Unpooled.wrappedBuffer(packetBytes,0,packetBytes.length / numberOfChunks)) + streamOverheadBytes + spaceNeededForFlush; + + assert CodedOutputStreamSizeUtil + .computeByteBufRemainingSizeNoTag(Unpooled.wrappedBuffer(packetBytes,0,packetBytes.length / numberOfChunks)) == 2; + + var serializer = createSerializerWithTestHandler(outputBuffersCreated, bufferSizeToGetDesiredChunks); + + serializer.addWriteEvent(REFERENCE_TIMESTAMP, Unpooled.wrappedBuffer(packetBytes)); + + var future = serializer.flushCommitAndResetStream(true); + future.get(); + + List observations = new ArrayList<>(); + for (ByteBuffer buffer : outputBuffersCreated) { + var trafficStream = TrafficStream.parseFrom(buffer); + observations.add(trafficStream.getSubStream(0)); + } + + StringBuilder reconstructedData = new StringBuilder(); + for (TrafficObservation observation : observations) { + var stringChunk = observation.getWriteSegment().getData().toStringUtf8(); + reconstructedData.append(stringChunk); + } + Assertions.assertEquals(packetData, reconstructedData.toString()); + // Expect extra observation for EndOfSegmentMessage when chunked + Assertions.assertEquals(numberOfChunks + 1, observations.size()); + } + + @Test + public void testExceptionWhenStreamTooSmallForObservation() throws IOException, ExecutionException, InterruptedException { + byte[] packetBytes = new byte[1]; + var outputBuffersCreated = new ConcurrentLinkedQueue(); + + var streamOverheadBytes = CodedOutputStream.computeStringSize(TrafficStream.CONNECTIONID_FIELD_NUMBER, TEST_TRAFFIC_STREAM_ID_STRING) + + CodedOutputStream.computeStringSize(TrafficStream.NODEID_FIELD_NUMBER, TEST_NODE_ID_STRING); + var spaceNeededForFlush = CodedOutputStream.computeInt32Size(TrafficStream.NUMBEROFTHISLASTCHUNK_FIELD_NUMBER, 1); + + var bufferSizeWithoutSpaceForBytes = CodedOutputStreamSizeUtil.maxBytesNeededForASegmentedObservation(REFERENCE_TIMESTAMP, + TrafficObservation.WRITESEGMENT_FIELD_NUMBER, + WriteSegmentObservation.DATA_FIELD_NUMBER, + Unpooled.buffer()) + streamOverheadBytes + spaceNeededForFlush; + + var serializer = createSerializerWithTestHandler(outputBuffersCreated, bufferSizeWithoutSpaceForBytes); + + Assertions.assertThrows(IllegalStateException.class, () -> { + serializer.addWriteEvent(REFERENCE_TIMESTAMP, Unpooled.wrappedBuffer(packetBytes)); + }); + + var future = serializer.flushCommitAndResetStream(true); + future.get(); + } + + @Test public void testCloseObservationAfterWriteWillFlushWhenSpaceNeeded() throws IOException, ExecutionException, InterruptedException { @@ -197,6 +269,77 @@ public void testEmptyPacketIsHandledForSmallCodedOutputStream() Assertions.assertEquals(0, reconstitutedTrafficStream.getSubStream(1).getWrite().getData().size()); } + @ParameterizedTest + @ValueSource(ints = {1, 16}) + public void testWriteIsHandledForBufferWithNioBufAllocatedLargerThanWritten(int numberOfChunks) + throws IOException, ExecutionException, InterruptedException { + // Use ByteBuf that returns nioBuffer with limit < capacity this can cause some edge cases to trigger depending + // on specific CodedOutputStream apis used + var outputBuffersCreated = new ConcurrentLinkedQueue(); + + var dataBytes = FAKE_READ_PACKET_DATA.getBytes(StandardCharsets.UTF_8); + + Assertions.assertTrue(dataBytes.length >= numberOfChunks); + + var streamOverheadBytes = CodedOutputStream.computeStringSize(TrafficStream.CONNECTIONID_FIELD_NUMBER, TEST_TRAFFIC_STREAM_ID_STRING) + + CodedOutputStream.computeStringSize(TrafficStream.NODEID_FIELD_NUMBER, TEST_NODE_ID_STRING) + 2; + + var bufferSizeToGetDesiredChunks = CodedOutputStreamSizeUtil.maxBytesNeededForASegmentedObservation(REFERENCE_TIMESTAMP, 1,2, + Unpooled.wrappedBuffer(dataBytes,0,dataBytes.length / numberOfChunks)) + streamOverheadBytes; + + var serializer = createSerializerWithTestHandler(outputBuffersCreated, bufferSizeToGetDesiredChunks); + + var readOnlyBuf = createReadOnlyByteBufWithNioByteBufferWithCapacityLargerThanLimit(dataBytes); + + serializer.addWriteEvent(REFERENCE_TIMESTAMP, readOnlyBuf); + + var future = serializer.flushCommitAndResetStream(true); + future.get(); + + List observations = new ArrayList<>(); + for (ByteBuffer buffer : outputBuffersCreated) { + var trafficStream = TrafficStream.parseFrom(buffer); + observations.add(trafficStream.getSubStream(0)); + } + + StringBuilder reconstructedData = new StringBuilder(); + for (TrafficObservation observation : observations) { + var stringChunk = ((numberOfChunks == 1) ? observation.getWrite().getData() + : observation.getWriteSegment().getData()).toStringUtf8(); + reconstructedData.append(stringChunk); + } + Assertions.assertEquals(FAKE_READ_PACKET_DATA, reconstructedData.toString()); + // Expect extra observation for EndOfSegmentMessage when chunked + var expectedObservations = numberOfChunks > 1 ? numberOfChunks + 1 : numberOfChunks; + Assertions.assertEquals(expectedObservations, observations.size()); + Assertions.assertEquals(0, readOnlyBuf.readerIndex()); + } + + public static io.netty.buffer.ByteBuf createReadOnlyByteBufWithNioByteBufferWithCapacityLargerThanLimit(byte[] dataBytes) { + // Force creation of a ByteBuf that returns nioBuffer with limit < capacity. + // This can cause some edge cases to trigger during interaction with underlying ByteBuffers + final int spaceBetweenLimitAndCapacity = 100; + final ByteBuffer byteBuffer = ByteBuffer.allocateDirect(dataBytes.length + spaceBetweenLimitAndCapacity); + byteBuffer.put(dataBytes); + byteBuffer.position(dataBytes.length + spaceBetweenLimitAndCapacity); + byteBuffer.flip(); + + io.netty.buffer.ByteBuf buf = Unpooled.wrappedBuffer(byteBuffer.asReadOnlyBuffer()); + buf.writerIndex(buf.writerIndex() - spaceBetweenLimitAndCapacity); + + // Double check ByteBuf behaves as expected + var nioBuf = buf.nioBuffer(); + Assertions.assertEquals(0, nioBuf.position()); + Assertions.assertEquals(dataBytes.length + spaceBetweenLimitAndCapacity, nioBuf.capacity()); + Assertions.assertEquals(dataBytes.length, nioBuf.limit()); + + Assertions.assertEquals(dataBytes.length, buf.readableBytes()); + Assertions.assertEquals(0, buf.readerIndex()); + Assertions.assertEquals(dataBytes.length + spaceBetweenLimitAndCapacity, buf.capacity()); + + return buf; + } + @Test public void testWithLimitlessCodedOutputStreamHolder() throws IOException, ExecutionException, InterruptedException { @@ -324,11 +467,113 @@ public void testAssertionErrorDuringInitializationWhenInitializeWithTooLargeId() final String tooLargeNodeId = 'a' + realNodeId; var outputBuffersCreated = new ConcurrentLinkedQueue(); - assertThrows(AssertionError.class, + Assertions.assertThrows(AssertionError.class, () -> new StreamChannelConnectionCaptureSerializer<>("a" + tooLargeNodeId, realKafkaConnectionId, new StreamManager(getEstimatedTrafficStreamByteSize(0, 0), outputBuffersCreated))); } + @ParameterizedTest + @CsvSource(value = { + // Test with totalAvailableSpace >= requestedWriteableSpace + "2,2", + "3,2", + "4,2", + + // Test around length where length bytes increases with totalAvailableSpace < requestedWriteableSpace + "1,10", + "2,10", + "3,10", + + "127,10000000", + "128,10000000", // 2^7 + "129,10000000", + "130,10000000", + "131,10000000", + "132,10000000", + + "16381,10000000", + "16382,10000000", + "16383,10000000", + "16384,10000000", // 2^14 + "16385,10000000", + "16386,10000000", + "16387,10000000", + "16388,10000000", + "16389,10000000", + + "2097150,10000000", + "2097151,10000000", + "2097152,10000000", // 2^21 + "2097153,10000000", + "2097154,10000000", + "2097155,10000000", + "2097156,10000000", + "2097157,10000000", + "2097158,10000000", + + "268435455,100000000", + "268435456,100000000", // 2^28 + "268435457,100000000", + "268435458,100000000", + "268435459,100000000", + "268435460,100000000", + "268435461,100000000", + "268435462,100000000" + }) + public void test_computeMaxLengthDelimitedFieldSizeForSpace(int totalAvailableSpace, int requestedWriteableSpace) { + var optimalMaxWriteableSpace = optimalComputeMaxLengthDelimitedFieldSizeForSpace(totalAvailableSpace, + requestedWriteableSpace); + + Assertions.assertTrue(optimalMaxWriteableSpace <= requestedWriteableSpace, "cannot write more bytes than requested"); + + var spaceLeftAfterWrite = totalAvailableSpace + - CodedOutputStream.computeInt32SizeNoTag(optimalMaxWriteableSpace) + - optimalMaxWriteableSpace; + Assertions.assertTrue(spaceLeftAfterWrite >= 0, "expected non-negative space left"); + if (optimalMaxWriteableSpace < requestedWriteableSpace) { + Assertions.assertTrue(spaceLeftAfterWrite <= 1, "expected space left to be no more than 1 if" + + "not enough space for requestedWriteableSpace"); + } + + if (optimalMaxWriteableSpace < requestedWriteableSpace) { + // If we are not writing all requestedWriteableSpace verify there is not space for one more byte + var expectedSpaceLeftAfterWriteIfOneMoreByteWrote = totalAvailableSpace + - CodedOutputStream.computeInt32SizeNoTag(optimalMaxWriteableSpace) + - CodedOutputStream.computeInt32SizeNoTag(optimalMaxWriteableSpace + 1) + - (optimalMaxWriteableSpace + 1); + Assertions.assertTrue(expectedSpaceLeftAfterWriteIfOneMoreByteWrote < 0, "expected no space to write one more byte"); + } + + // Test that when maxWriteableSpace != optimalMaxWriteableSpace, then + // it is positive and equal to calculateMaxWritableSpace - 1 + var maxWriteableSpace = StreamChannelConnectionCaptureSerializer.computeMaxLengthDelimitedFieldSizeForSpace(totalAvailableSpace, + requestedWriteableSpace); + if (maxWriteableSpace != optimalMaxWriteableSpace) { + Assertions.assertTrue(maxWriteableSpace > 0); + Assertions.assertEquals(optimalMaxWriteableSpace - 1, maxWriteableSpace); + var spaceLeftIfWritten = totalAvailableSpace - CodedOutputStream.computeInt32SizeNoTag(maxWriteableSpace) - maxWriteableSpace; + if (maxWriteableSpace < requestedWriteableSpace) { + Assertions.assertTrue(spaceLeftIfWritten <= 1, "expected pessimistic space left to be no more than 1 if" + + "not enough space for requestedWriteableSpace"); + } + } + } + + // Optimally calculates maxWriteableSpace taking into account lengthSpace. In some cases, this may + // yield a maxWriteableSpace with one leftover byte, however, this is still the correct max as attempting to write + // the additional byte would yield an additional length byte and overflow + public static int optimalComputeMaxLengthDelimitedFieldSizeForSpace(int totalAvailableSpace, int requestedWriteableSpace) { + // Overestimate the lengthFieldSpace first to then correct in instances where writing one more byte does not result + // in as large a lengthFieldSpace. In instances where we must have a leftoverByte, it will be in the lengthFieldSpace + final int maxLengthFieldSpace = CodedOutputStream.computeUInt32SizeNoTag(totalAvailableSpace); + final int initialEstimatedMaxWriteSpace = totalAvailableSpace - maxLengthFieldSpace; + final int lengthFieldSpaceForInitialEstimatedAndOneMoreByte = CodedOutputStream.computeUInt32SizeNoTag(initialEstimatedMaxWriteSpace + 1); + + int maxWriteBytesSpace = totalAvailableSpace - lengthFieldSpaceForInitialEstimatedAndOneMoreByte; + + return Math.min(maxWriteBytesSpace, requestedWriteableSpace); + } + @Test public void testInitializationWithRealIds() { final String realNodeId = "b671d2f2-577b-414e-9eb4-8bc3e89ee182"; @@ -370,18 +615,23 @@ protected CompletableFuture kickoffCloseStream(CodedOutputStreamHolder out "Unknown outputStreamHolder sent back to StreamManager: " + outputStreamHolder); } var osh = (CodedOutputStreamAndByteBufferWrapper) outputStreamHolder; - log.trace("Getting ready to flush for " + osh); - log.trace("Bytes written so far... " + StandardCharsets.UTF_8.decode(osh.getByteBuffer().duplicate())); + log.atTrace().setMessage("Getting ready to flush for {}").addArgument(osh).log(); + log.atTrace().setMessage("Bytes written so far... {}") + .addArgument(() -> StandardCharsets.UTF_8.decode(osh.getByteBuffer().duplicate())).log(); return CompletableFuture.runAsync(() -> { try { osh.getOutputStream().flush(); - log.trace("Just flushed for " + osh.getOutputStream()); + log.atTrace().setMessage("Just flushed for {}") + .addArgument(osh.getOutputStream()) + .log(); var bb = osh.getByteBuffer(); bb.position(0); var bytesWritten = osh.getOutputStream().getTotalBytesWritten(); bb.limit(bytesWritten); - log.trace("Adding " + StandardCharsets.UTF_8.decode(bb.duplicate())); + log.atTrace().setMessage("Adding {}") + .addArgument(() -> StandardCharsets.UTF_8.decode(bb.duplicate())) + .log(); outputBuffers.add(bb); } catch (IOException e) { throw Lombok.sneakyThrow(e); diff --git a/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpHandlerTest.java b/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpHandlerTest.java index ac35777b2..80b2aad91 100644 --- a/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpHandlerTest.java +++ b/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpHandlerTest.java @@ -30,6 +30,7 @@ import java.util.stream.Stream; @Slf4j +@WrapWithNettyLeakDetection public class ConditionallyReliableLoggingHttpHandlerTest { private static void writeMessageAndVerify(byte[] fullTrafficBytes, Consumer channelWriter, @@ -46,10 +47,11 @@ private static void writeMessageAndVerify(byte[] fullTrafficBytes, Consumer new ByteBufInputStream((ByteBuf) m, true)) - .collect(Collectors.toList()))) - .readAllBytes(); + var outputDataStream = new SequenceInputStream(Collections.enumeration(channel.inboundMessages().stream() + .map(m -> new ByteBufInputStream((ByteBuf) m, false)) + .collect(Collectors.toList()))); + var outputData = outputDataStream.readAllBytes(); + outputDataStream.close(); Assertions.assertArrayEquals(fullTrafficBytes, outputData); Assertions.assertNotNull(streamManager.byteBufferAtomicReference.get(), @@ -70,6 +72,9 @@ private static void writeMessageAndVerify(byte[] fullTrafficBytes, Consumer offloader, headerCapturePredicate, x -> true)); getWriter(false, true, SimpleRequests.HEALTH_CHECK.getBytes(StandardCharsets.UTF_8)).accept(channel); + channel.finishAndReleaseAll(); channel.close(); var requestBytes = SimpleRequests.HEALTH_CHECK.getBytes(StandardCharsets.UTF_8); @@ -160,7 +166,7 @@ public void testThatHealthCheckCaptureCanBeSuppressed(boolean singleBytes) throw // we wrote the correct data to the downstream handler/channel var consumedData = new SequenceInputStream(Collections.enumeration(channel.inboundMessages().stream() - .map(m -> new ByteBufInputStream((ByteBuf) m)) + .map(m -> new ByteBufInputStream((ByteBuf) m, false)) .collect(Collectors.toList()))) .readAllBytes(); log.info("captureddata = " + new String(consumedData, StandardCharsets.UTF_8)); @@ -198,6 +204,8 @@ public void testThatHealthCheckCaptureCanBeSuppressed(boolean singleBytes) throw new String(reconstitutedTrafficStreamWrites, StandardCharsets.UTF_8)); Assertions.assertArrayEquals(bytesForResponsePreserved, reconstitutedTrafficStreamWrites); } + + channel.finishAndReleaseAll(); } } @@ -220,7 +228,6 @@ private Consumer getWriter(boolean singleBytes, boolean usePool @ParameterizedTest @ValueSource(booleans = {true, false}) - @WrapWithNettyLeakDetection(repetitions = 16) public void testThatAPostInTinyPacketsBlocksFutureActivity_withLeakDetection(boolean usePool) throws Exception { testThatAPostInTinyPacketsBlocksFutureActivity(usePool, false); //MyResourceLeakDetector.dumpHeap("nettyWireLogging_"+COUNT+"_"+ Instant.now() +".hprof", true); diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java index 66567a7dc..6796fa561 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java @@ -1,9 +1,11 @@ package org.opensearch.migrations.replay; import io.netty.buffer.ByteBuf; +import io.netty.handler.codec.base64.Base64; +import io.netty.handler.codec.base64.Base64Dialect; import io.netty.handler.codec.http.HttpHeaders; +import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.util.Base64; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -100,18 +102,15 @@ public static void fillStatusCodeMetrics(@NonNull IReplayContexts.ITupleHandling targetResponseOp.ifPresent(r -> context.setTargetStatus((Integer) r.get(STATUS_CODE_KEY))); } - private static byte[] getBytesFromByteBuf(ByteBuf buf) { - var bytes = new byte[buf.readableBytes()]; - buf.getBytes(buf.readerIndex(), bytes); - return bytes; - } - private static Map fillMap(LinkedHashMap map, HttpHeaders headers, ByteBuf content) { - String base64body = Base64.getEncoder().encodeToString(getBytesFromByteBuf(content)); - map.put("body", base64body); - headers.entries().stream().forEach(kvp -> map.put(kvp.getKey(), kvp.getValue())); - return map; + try (var encodedBufHolder = RefSafeHolder.create(Base64.encode(content, false, Base64Dialect.STANDARD))) { + var encodedBuf = encodedBufHolder.get(); + assert encodedBuf != null : "Base64.encode should not return null"; + headers.entries().forEach(kvp -> map.put(kvp.getKey(), kvp.getValue())); + map.put("body", encodedBuf.toString(StandardCharsets.UTF_8)); + return map; + } } private static Map makeSafeMap(@NonNull IReplayContexts.ITupleHandlingContext context, diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/HttpJsonTransformingConsumer.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/HttpJsonTransformingConsumer.java index 3bdb529b6..d6c969559 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/HttpJsonTransformingConsumer.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/HttpJsonTransformingConsumer.java @@ -90,17 +90,13 @@ private HttpRequestDecoder getHttpRequestDecoderHandler() { @Override public TrackedFuture consumeBytes(ByteBuf nextRequestPacket) { - chunks.add(nextRequestPacket.duplicate().readerIndex(0).retain()); + chunks.add(nextRequestPacket.retainedDuplicate()); chunkSizes.get(chunkSizes.size() - 1).add(nextRequestPacket.readableBytes()); - if (log.isTraceEnabled()) { - byte[] copy = new byte[nextRequestPacket.readableBytes()]; - nextRequestPacket.duplicate().readBytes(copy); - log.trace("HttpJsonTransformingConsumer[" + this + "]: writing into embedded channel: " - + new String(copy, StandardCharsets.UTF_8)); - } + log.atTrace().log(() -> "HttpJsonTransformingConsumer[" + this + "]: writing into embedded channel: " + + nextRequestPacket.toString(StandardCharsets.UTF_8)); return TextTrackedFuture.completedFuture(null, ()->"initialValue") - .map(cf->cf.thenAccept(x -> channel.writeInbound(nextRequestPacket)), - ()->"HttpJsonTransformingConsumer sending bytes to its EmbeddedChannel"); + .map(cf->cf.thenAccept(x -> channel.writeInbound(nextRequestPacket)), + ()->"HttpJsonTransformingConsumer sending bytes to its EmbeddedChannel"); } public TrackedFuture> finalizeRequest() { diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonContentAuthSigner.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonContentAuthSigner.java index 31ceebc53..e0073f7c3 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonContentAuthSigner.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonContentAuthSigner.java @@ -4,19 +4,22 @@ import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.http.HttpContent; import io.netty.handler.codec.http.LastHttpContent; +import lombok.extern.slf4j.Slf4j; import org.opensearch.migrations.transform.IAuthTransformer; import java.util.ArrayList; import java.util.List; +@Slf4j public class NettyJsonContentAuthSigner extends ChannelInboundHandlerAdapter { IAuthTransformer.StreamingFullMessageTransformer signer; HttpJsonMessageWithFaultingPayload httpMessage; - List receivedHttpContents; + List httpContentsBuffer; public NettyJsonContentAuthSigner(IAuthTransformer.StreamingFullMessageTransformer signer) { this.signer = signer; - this.receivedHttpContents = new ArrayList<>(); + this.httpContentsBuffer = new ArrayList<>(); + httpMessage = null; } @Override @@ -24,23 +27,46 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception if (msg instanceof HttpJsonMessageWithFaultingPayload) { httpMessage = (HttpJsonMessageWithFaultingPayload) msg; } else if (msg instanceof HttpContent) { - receivedHttpContents.add(((HttpContent) msg).retainedDuplicate()); var httpContent = (HttpContent) msg; + httpContentsBuffer.add(httpContent); signer.consumeNextPayloadPart(httpContent.content().nioBuffer()); if (msg instanceof LastHttpContent) { - finalizeSignature(ctx); + signer.finalizeSignature(httpMessage); + flushDownstream(ctx); } } else { super.channelRead(ctx, msg); } } - private void finalizeSignature(ChannelHandlerContext ctx) { - signer.finalizeSignature(httpMessage); - ctx.fireChannelRead(httpMessage); - receivedHttpContents.stream().forEach(content->{ - ctx.fireChannelRead(content); - content.content().release(); - }); + private boolean flushDownstream(ChannelHandlerContext ctx) { + boolean messageFlushed = httpMessage != null || !httpContentsBuffer.isEmpty(); + if(httpMessage != null) { + ctx.fireChannelRead(httpMessage); + httpMessage = null; + } + httpContentsBuffer.forEach(ctx::fireChannelRead); + httpContentsBuffer.clear(); + return messageFlushed; + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + boolean messageFlushed = flushDownstream(ctx); + if (messageFlushed) { + log.atWarn().setMessage(() -> "Failed to sign message due to handler removed" + + " before the end of the http contents were received").log(); + } + super.handlerRemoved(ctx); + } + + @Override + public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { + boolean messageFlushed = flushDownstream(ctx); + if (messageFlushed) { + log.atWarn().setMessage(() -> "Failed to sign message due to channel unregistered" + + " before the end of the http contents were received").log(); + } + super.channelUnregistered(ctx); } } \ No newline at end of file diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonContentCompressor.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonContentCompressor.java index bb9525cfd..7f7d7bd6c 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonContentCompressor.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonContentCompressor.java @@ -58,12 +58,16 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception } else if (msg instanceof HttpContent) { if (compressorStream != null) { var contentBuf = ((HttpContent) msg).content(); - contentBuf.readBytes(compressorStream, contentBuf.readableBytes()); - if (msg instanceof LastHttpContent) { - closeStream(); - ctx.fireChannelRead(LastHttpContent.EMPTY_LAST_CONTENT); + try { + contentBuf.readBytes(compressorStream, contentBuf.readableBytes()); + if (msg instanceof LastHttpContent) { + closeStream(); + ctx.fireChannelRead(LastHttpContent.EMPTY_LAST_CONTENT); + } + return; // fireChannelRead will be fired on the compressed contents via the compressorStream. + } finally { + contentBuf.release(); } - return; // fireChannelRead will be fired on the compressed contents via the compressorStream. } else { assert (bufferedOutputStream == null && passDownstreamOutputStream == null) : "Expected contents with data to be passed through the compression stream before it was closed OR " + diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/TransformedPackets.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/TransformedPackets.java index 58bfa3291..b4f052c7b 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/TransformedPackets.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datatypes/TransformedPackets.java @@ -2,6 +2,7 @@ import io.netty.buffer.ByteBuf; +import io.netty.util.ReferenceCounted; import java.util.ArrayList; import java.util.StringJoiner; import java.util.stream.Stream; @@ -36,7 +37,7 @@ public Stream asByteArrayStream() { @Override public void close() { - data.stream().forEach(bb->bb.release()); + data.forEach(ReferenceCounted::release); // Once we're closed, I'd rather see an NPE rather than refCnt errors from netty, which // could cause us to look in many places before finding out that it was just localize // to how callers were handling this object diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/netty/BacksideSnifferHandler.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/netty/BacksideSnifferHandler.java index 3985a10b3..383910b9d 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/netty/BacksideSnifferHandler.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/netty/BacksideSnifferHandler.java @@ -25,9 +25,8 @@ public void channelActive(ChannelHandlerContext ctx) { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { var bb = (ByteBuf) msg; byte[] output = new byte[bb.readableBytes()]; - bb.readBytes(output); + bb.duplicate().readBytes(output); aggregatedRawResponseBuilder.addResponsePacket(output); - bb.resetReaderIndex(); ctx.fireChannelRead(msg); } } \ No newline at end of file diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/RequestSenderOrchestratorTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/RequestSenderOrchestratorTest.java index 598585854..ac55a3d7d 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/RequestSenderOrchestratorTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/RequestSenderOrchestratorTest.java @@ -210,7 +210,7 @@ public void testThatSchedulingWorks() throws Exception { var body = response.content(); Assertions.assertEquals(TestHttpServerContext.SERVER_RESPONSE_BODY_PREFIX + TestHttpServerContext.getUriForIthRequest(i / NUM_REPEATS), - body.duplicate().toString(StandardCharsets.UTF_8)); + body.toString(StandardCharsets.UTF_8)); } } closeFuture.get(); diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ResultsToLogsConsumerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ResultsToLogsConsumerTest.java index 72e1f63cc..6d623088e 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ResultsToLogsConsumerTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ResultsToLogsConsumerTest.java @@ -17,13 +17,14 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.core.LogEvent; import org.apache.logging.log4j.core.appender.AbstractAppender; +import org.apache.logging.log4j.core.impl.Log4jContextFactory; +import org.apache.logging.log4j.core.selector.ClassLoaderContextSelector; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.parallel.ResourceLock; import org.opensearch.migrations.replay.datatypes.HttpRequestTransformationStatus; import org.opensearch.migrations.replay.datatypes.PojoTrafficStreamKeyAndContext; import org.opensearch.migrations.replay.datatypes.TransformedPackets; -import org.opensearch.migrations.replay.datatypes.UniqueReplayerRequestKey; import org.opensearch.migrations.testutils.WrapWithNettyLeakDetection; import org.opensearch.migrations.tracing.InstrumentationTest; import org.opensearch.migrations.tracing.TestContext; @@ -32,6 +33,10 @@ @Slf4j @WrapWithNettyLeakDetection(repetitions = 4) class ResultsToLogsConsumerTest extends InstrumentationTest { + static { + // Synchronize logging to for assertions + LogManager.setFactory(new Log4jContextFactory(new ClassLoaderContextSelector())); + } private static final String NODE_ID = "n"; private static final ObjectMapper mapper = new ObjectMapper(); public static final String TEST_EXCEPTION_MESSAGE = "TEST_EXCEPTION"; @@ -153,11 +158,11 @@ public void testOutputterForGet() throws IOException { " \"Request-URI\": \"/test\",\r\n" + " \"Method\": \"GET\",\r\n" + " \"HTTP-Version\": \"HTTP/1.1\",\r\n" + - " \"body\": \"\",\r\n" + " \"Host\": \"foo.example\",\r\n" + " \"auTHorization\": \"Basic YWRtaW46YWRtaW4=\",\r\n" + " \"Content-Type\": \"application/json\",\r\n" + - " \"content-length\": \"0\"\r\n" + + " \"content-length\": \"0\",\r\n" + + " \"body\": \"\"\r\n" + " },\r\n" + " \"sourceResponse\": {\r\n" + " \"HTTP-Version\": {\r\n" + @@ -166,22 +171,22 @@ public void testOutputterForGet() throws IOException { " \"Status-Code\": 200,\r\n" + " \"Reason-Phrase\": \"OK\",\r\n" + " \"response_time_ms\": 0,\r\n" + - " \"body\": \"SSBzaG91bGQgYmUgZGVjcnlwdGVkIHRlc3RlciEN\",\r\n" + " \"Content-transfer-encoding\": \"chunked\",\r\n" + " \"Date\": \"Thu, 08 Jun 2023 23:06:23 GMT\",\r\n" + " \"Content-type\": \"text/plain\",\r\n" + " \"Funtime\": \"checkIt!\",\r\n" + - " \"content-length\": \"30\"\r\n" + + " \"content-length\": \"30\",\r\n" + + " \"body\": \"SSBzaG91bGQgYmUgZGVjcnlwdGVkIHRlc3RlciEN\"\r\n" + " },\r\n" + " \"targetRequest\": {\r\n" + " \"Request-URI\": \"/test\",\r\n" + " \"Method\": \"GET\",\r\n" + " \"HTTP-Version\": \"HTTP/1.1\",\r\n" + - " \"body\": \"\",\r\n" + " \"Host\": \"foo.example\",\r\n" + " \"auTHorization\": \"Basic YWRtaW46YWRtaW4=\",\r\n" + " \"Content-Type\": \"application/json\",\r\n" + - " \"content-length\": \"0\"\r\n" + + " \"content-length\": \"0\",\r\n" + + " \"body\": \"\"\r\n" + " },\r\n" + " \"targetResponse\": {\r\n" + " \"HTTP-Version\": {\r\n" + @@ -190,12 +195,12 @@ public void testOutputterForGet() throws IOException { " \"Status-Code\": 200,\r\n" + " \"Reason-Phrase\": \"OK\",\r\n" + " \"response_time_ms\": 267,\r\n" + - " \"body\": \"SSBzaG91bGQgYmUgZGVjcnlwdGVkIHRlc3RlciEN\",\r\n" + " \"Content-transfer-encoding\": \"chunked\",\r\n" + " \"Date\": \"Thu, 08 Jun 2023 23:06:23 GMT\",\r\n" + " \"Content-type\": \"text/plain\",\r\n" + " \"Funtime\": \"checkIt!\",\r\n" + - " \"content-length\": \"30\"\r\n" + + " \"content-length\": \"30\",\r\n" + + " \"body\": \"SSBzaG91bGQgYmUgZGVjcnlwdGVkIHRlc3RlciEN\"\r\n" + " },\r\n" + " \"connectionId\": \"testConnection.1\"\r\n" + "}"; @@ -211,10 +216,10 @@ public void testOutputterForPost() throws IOException { " \"Request-URI\": \"/test\",\r\n" + " \"Method\": \"POST\",\r\n" + " \"HTTP-Version\": \"HTTP/1.1\",\r\n" + - " \"body\": \"ew0KICAic2V0dGluZ3MiOiB7DQogICAgImluZGV4Ijogew0KICAgICAgIm51bWJlcl9vZl9zaGFyZHMiOiA3LA0KICAgICAgIm51bWJlcl9vZl9yZXBsaWNhcyI6IDMNCiAgICB9LA0KICAgICJhbmFseXNpcyI6IHsNCiAgICAgICJhbmFseXplciI6IHsNCiAgICAgICAgIm5hbWVBbmFseXplciI6IHsNCiAgICAgICAgICAidHlwZSI6ICJjdXN0b20iLA0KICAgICAgICAgICJ0b2tlbml6ZXIiOiAia2V5d29yZCIsDQogICAgICAgICAgImZpbHRlciI6ICJ1cHBlcmNhc2UiDQogICAgICAgIH0NCiAgICAgIH0NCiAgICB9DQogIH0sDQogICJtYXBwaW5ncyI6IHsNCiAgICAiZW1wbG95ZWUiOiB7DQogICAgICAicHJvcGVydGllcyI6IHsNCiAgICAgICAgImFnZSI6IHsNCiAgICAgICAgICAidHlwZSI6ICJsb25nIg0KICAgICAgICB9LA0KICAgICAgICAibGV2ZWwiOiB7DQogICAgICAgICAgInR5cGUiOiAibG9uZyINCiAgICAgICAgfSwNCiAgICAgICAgInRpdGxlIjogew0KICAgICAgICAgICJ0eXBlIjogInRleHQiDQogICAgICAgIH0sDQogICAgICAgICJuYW1lIjogew0KICAgICAgICAgICJ0eXBlIjogInRleHQiLA0KICAgICAgICAgICJhbmFseXplciI6ICJuYW1lQW5hbHl6ZXIiDQogICAgICAgIH0NCiAgICAgIH0NCiAgICB9DQogIH0NCn0NCg==\",\r\n" + " \"Host\": \"foo.example\",\r\n" + " \"Content-Type\": \"application/json\",\r\n" + - " \"Content-Length\": \"652\"\r\n" + + " \"Content-Length\": \"652\",\r\n" + + " \"body\": \"ew0KICAic2V0dGluZ3MiOiB7DQogICAgImluZGV4Ijogew0KICAgICAgIm51bWJlcl9vZl9zaGFyZHMiOiA3LA0KICAgICAgIm51bWJlcl9vZl9yZXBsaWNhcyI6IDMNCiAgICB9LA0KICAgICJhbmFseXNpcyI6IHsNCiAgICAgICJhbmFseXplciI6IHsNCiAgICAgICAgIm5hbWVBbmFseXplciI6IHsNCiAgICAgICAgICAidHlwZSI6ICJjdXN0b20iLA0KICAgICAgICAgICJ0b2tlbml6ZXIiOiAia2V5d29yZCIsDQogICAgICAgICAgImZpbHRlciI6ICJ1cHBlcmNhc2UiDQogICAgICAgIH0NCiAgICAgIH0NCiAgICB9DQogIH0sDQogICJtYXBwaW5ncyI6IHsNCiAgICAiZW1wbG95ZWUiOiB7DQogICAgICAicHJvcGVydGllcyI6IHsNCiAgICAgICAgImFnZSI6IHsNCiAgICAgICAgICAidHlwZSI6ICJsb25nIg0KICAgICAgICB9LA0KICAgICAgICAibGV2ZWwiOiB7DQogICAgICAgICAgInR5cGUiOiAibG9uZyINCiAgICAgICAgfSwNCiAgICAgICAgInRpdGxlIjogew0KICAgICAgICAgICJ0eXBlIjogInRleHQiDQogICAgICAgIH0sDQogICAgICAgICJuYW1lIjogew0KICAgICAgICAgICJ0eXBlIjogInRleHQiLA0KICAgICAgICAgICJhbmFseXplciI6ICJuYW1lQW5hbHl6ZXIiDQogICAgICAgIH0NCiAgICAgIH0NCiAgICB9DQogIH0NCn0NCg==\"\r\n" + " },\r\n" + " \"sourceResponse\": {\r\n" + " \"HTTP-Version\": {\r\n" + @@ -223,21 +228,21 @@ public void testOutputterForPost() throws IOException { " \"Status-Code\": 200,\r\n" + " \"Reason-Phrase\": \"OK\",\r\n" + " \"response_time_ms\": 0,\r\n" + - " \"body\": \"SSBzaG91bGQgYmUgZGVjcnlwdGVkIHRlc3RlciEN\",\r\n" + " \"Content-transfer-encoding\": \"chunked\",\r\n" + " \"Date\": \"Thu, 08 Jun 2023 23:06:23 GMT\",\r\n" + " \"Content-type\": \"text/plain\",\r\n" + " \"Funtime\": \"checkIt!\",\r\n" + - " \"content-length\": \"30\"\r\n" + + " \"content-length\": \"30\",\r\n" + + " \"body\": \"SSBzaG91bGQgYmUgZGVjcnlwdGVkIHRlc3RlciEN\"\r\n" + " },\r\n" + " \"targetRequest\": {\r\n" + " \"Request-URI\": \"/test\",\r\n" + " \"Method\": \"POST\",\r\n" + " \"HTTP-Version\": \"HTTP/1.1\",\r\n" + - " \"body\": \"ew0KICAic2V0dGluZ3MiOiB7DQogICAgImluZGV4Ijogew0KICAgICAgIm51bWJlcl9vZl9zaGFyZHMiOiA3LA0KICAgICAgIm51bWJlcl9vZl9yZXBsaWNhcyI6IDMNCiAgICB9LA0KICAgICJhbmFseXNpcyI6IHsNCiAgICAgICJhbmFseXplciI6IHsNCiAgICAgICAgIm5hbWVBbmFseXplciI6IHsNCiAgICAgICAgICAidHlwZSI6ICJjdXN0b20iLA0KICAgICAgICAgICJ0b2tlbml6ZXIiOiAia2V5d29yZCIsDQogICAgICAgICAgImZpbHRlciI6ICJ1cHBlcmNhc2UiDQogICAgICAgIH0NCiAgICAgIH0NCiAgICB9DQogIH0sDQogICJtYXBwaW5ncyI6IHsNCiAgICAiZW1wbG95ZWUiOiB7DQogICAgICAicHJvcGVydGllcyI6IHsNCiAgICAgICAgImFnZSI6IHsNCiAgICAgICAgICAidHlwZSI6ICJsb25nIg0KICAgICAgICB9LA0KICAgICAgICAibGV2ZWwiOiB7DQogICAgICAgICAgInR5cGUiOiAibG9uZyINCiAgICAgICAgfSwNCiAgICAgICAgInRpdGxlIjogew0KICAgICAgICAgICJ0eXBlIjogInRleHQiDQogICAgICAgIH0sDQogICAgICAgICJuYW1lIjogew0KICAgICAgICAgICJ0eXBlIjogInRleHQiLA0KICAgICAgICAgICJhbmFseXplciI6ICJuYW1lQW5hbHl6ZXIiDQogICAgICAgIH0NCiAgICAgIH0NCiAgICB9DQogIH0NCn0NCg==\",\r\n" + " \"Host\": \"foo.example\",\r\n" + " \"Content-Type\": \"application/json\",\r\n" + - " \"Content-Length\": \"652\"\r\n" + + " \"Content-Length\": \"652\",\r\n" + + " \"body\": \"ew0KICAic2V0dGluZ3MiOiB7DQogICAgImluZGV4Ijogew0KICAgICAgIm51bWJlcl9vZl9zaGFyZHMiOiA3LA0KICAgICAgIm51bWJlcl9vZl9yZXBsaWNhcyI6IDMNCiAgICB9LA0KICAgICJhbmFseXNpcyI6IHsNCiAgICAgICJhbmFseXplciI6IHsNCiAgICAgICAgIm5hbWVBbmFseXplciI6IHsNCiAgICAgICAgICAidHlwZSI6ICJjdXN0b20iLA0KICAgICAgICAgICJ0b2tlbml6ZXIiOiAia2V5d29yZCIsDQogICAgICAgICAgImZpbHRlciI6ICJ1cHBlcmNhc2UiDQogICAgICAgIH0NCiAgICAgIH0NCiAgICB9DQogIH0sDQogICJtYXBwaW5ncyI6IHsNCiAgICAiZW1wbG95ZWUiOiB7DQogICAgICAicHJvcGVydGllcyI6IHsNCiAgICAgICAgImFnZSI6IHsNCiAgICAgICAgICAidHlwZSI6ICJsb25nIg0KICAgICAgICB9LA0KICAgICAgICAibGV2ZWwiOiB7DQogICAgICAgICAgInR5cGUiOiAibG9uZyINCiAgICAgICAgfSwNCiAgICAgICAgInRpdGxlIjogew0KICAgICAgICAgICJ0eXBlIjogInRleHQiDQogICAgICAgIH0sDQogICAgICAgICJuYW1lIjogew0KICAgICAgICAgICJ0eXBlIjogInRleHQiLA0KICAgICAgICAgICJhbmFseXplciI6ICJuYW1lQW5hbHl6ZXIiDQogICAgICAgIH0NCiAgICAgIH0NCiAgICB9DQogIH0NCn0NCg==\"\r\n" + " },\r\n" + " \"targetResponse\": {\r\n" + " \"HTTP-Version\": {\r\n" + @@ -246,12 +251,12 @@ public void testOutputterForPost() throws IOException { " \"Status-Code\": 200,\r\n" + " \"Reason-Phrase\": \"OK\",\r\n" + " \"response_time_ms\": 267,\r\n" + - " \"body\": \"SSBzaG91bGQgYmUgZGVjcnlwdGVkIHRlc3RlciEN\",\r\n" + " \"Content-transfer-encoding\": \"chunked\",\r\n" + " \"Date\": \"Thu, 08 Jun 2023 23:06:23 GMT\",\r\n" + " \"Content-type\": \"text/plain\",\r\n" + " \"Funtime\": \"checkIt!\",\r\n" + - " \"content-length\": \"30\"\r\n" + + " \"content-length\": \"30\",\r\n" + + " \"body\": \"SSBzaG91bGQgYmUgZGVjcnlwdGVkIHRlc3RlciEN\"\r\n" + " },\r\n" + " \"connectionId\": \"testConnection.1\"\r\n" + "}"; diff --git a/TrafficCapture/trafficReplayer/src/testFixtures/java/org/opensearch/migrations/replay/TestCapturePacketToHttpHandler.java b/TrafficCapture/trafficReplayer/src/testFixtures/java/org/opensearch/migrations/replay/TestCapturePacketToHttpHandler.java index e8fe4edfd..ec0cde377 100644 --- a/TrafficCapture/trafficReplayer/src/testFixtures/java/org/opensearch/migrations/replay/TestCapturePacketToHttpHandler.java +++ b/TrafficCapture/trafficReplayer/src/testFixtures/java/org/opensearch/migrations/replay/TestCapturePacketToHttpHandler.java @@ -42,7 +42,7 @@ public TestCapturePacketToHttpHandler(Duration consumeDuration, public TrackedFuture consumeBytes(ByteBuf nextRequestPacket) { numConsumes.incrementAndGet(); log.info("incoming buffer refcnt="+nextRequestPacket.refCnt()); - var duplicatedPacket = nextRequestPacket.duplicate().retain(); + var duplicatedPacket = nextRequestPacket.retainedDuplicate(); return new TrackedFuture<>(CompletableFuture.runAsync(() -> { try { log.info("Running async future for " + nextRequestPacket); diff --git a/TrafficCapture/trafficReplayer/src/testFixtures/java/org/opensearch/migrations/replay/TestUtils.java b/TrafficCapture/trafficReplayer/src/testFixtures/java/org/opensearch/migrations/replay/TestUtils.java index 80e11e32f..3f6522eda 100644 --- a/TrafficCapture/trafficReplayer/src/testFixtures/java/org/opensearch/migrations/replay/TestUtils.java +++ b/TrafficCapture/trafficReplayer/src/testFixtures/java/org/opensearch/migrations/replay/TestUtils.java @@ -115,13 +115,10 @@ protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) thro fullRequest.release(); } - private static String getStringFromContent(FullHttpRequest fullRequest) throws IOException { - try (var baos = new ByteArrayOutputStream()) { - var bb = fullRequest.content(); - bb.readBytes(baos, bb.readableBytes()); - return baos.toString(StandardCharsets.UTF_8); - } + private static String getStringFromContent(FullHttpRequest fullRequest) { + return fullRequest.content().toString(StandardCharsets.UTF_8); } + static void runPipelineAndValidate(TestContext rootContext, IAuthTransformerFactory authTransformer, String extraHeaders,