diff --git a/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java b/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java index 04d2d02d07..2b93acf742 100644 --- a/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java +++ b/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java @@ -24,6 +24,9 @@ import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.junit.jupiter.params.provider.ValueSource; import org.opensearch.migrations.trafficcapture.StreamChannelConnectionCaptureSerializerTest.StreamManager.NullStreamManager; import org.opensearch.migrations.trafficcapture.protos.CloseObservation; import org.opensearch.migrations.trafficcapture.protos.ConnectionExceptionObservation; @@ -33,6 +36,7 @@ import org.opensearch.migrations.trafficcapture.protos.TrafficObservation; import org.opensearch.migrations.trafficcapture.protos.TrafficStream; import org.opensearch.migrations.trafficcapture.protos.WriteObservation; +import org.opensearch.migrations.trafficcapture.protos.WriteSegmentObservation; @Slf4j class StreamChannelConnectionCaptureSerializerTest { @@ -150,6 +154,74 @@ public void testBasicDataConsistencyWhenChunking() throws IOException, Execution Assertions.assertEquals(packetData, reconstructedData.toString()); } + @Test + public void testWriteObservationWithSpaceForOneByteAtATime() throws IOException, ExecutionException, InterruptedException { + var packetData = FAKE_READ_PACKET_DATA.repeat(5); + byte[] packetBytes = packetData.getBytes(StandardCharsets.UTF_8); + var numberOfChunks = packetBytes.length; + + var outputBuffersCreated = new ConcurrentLinkedQueue(); + + var streamOverheadBytes = CodedOutputStream.computeStringSize(TrafficStream.CONNECTIONID_FIELD_NUMBER, TEST_TRAFFIC_STREAM_ID_STRING) + + CodedOutputStream.computeStringSize(TrafficStream.NODEID_FIELD_NUMBER, TEST_NODE_ID_STRING); + var spaceNeededForFlush = CodedOutputStream.computeInt32Size(TrafficStream.NUMBEROFTHISLASTCHUNK_FIELD_NUMBER, packetData.length()); + + var bufferSizeToGetDesiredChunks = CodedOutputStreamSizeUtil.maxBytesNeededForASegmentedObservation(REFERENCE_TIMESTAMP, + TrafficObservation.WRITESEGMENT_FIELD_NUMBER, + WriteSegmentObservation.DATA_FIELD_NUMBER, + Unpooled.wrappedBuffer(packetBytes,0,packetBytes.length / numberOfChunks)) + streamOverheadBytes + spaceNeededForFlush; + + assert CodedOutputStreamSizeUtil + .computeByteBufRemainingSizeNoTag(Unpooled.wrappedBuffer(packetBytes,0,packetBytes.length / numberOfChunks)) == 2; + + var serializer = createSerializerWithTestHandler(outputBuffersCreated, bufferSizeToGetDesiredChunks); + + serializer.addWriteEvent(REFERENCE_TIMESTAMP, Unpooled.wrappedBuffer(packetBytes)); + + var future = serializer.flushCommitAndResetStream(true); + future.get(); + + List observations = new ArrayList<>(); + for (ByteBuffer buffer : outputBuffersCreated) { + var trafficStream = TrafficStream.parseFrom(buffer); + observations.add(trafficStream.getSubStream(0)); + } + + StringBuilder reconstructedData = new StringBuilder(); + for (TrafficObservation observation : observations) { + var stringChunk = observation.getWriteSegment().getData().toStringUtf8(); + reconstructedData.append(stringChunk); + } + Assertions.assertEquals(packetData, reconstructedData.toString()); + // Expect extra observation for EndOfSegmentMessage when chunked + Assertions.assertEquals(numberOfChunks + 1, observations.size()); + } + + @Test + public void testExceptionWhenStreamTooSmallForObservation() throws IOException, ExecutionException, InterruptedException { + byte[] packetBytes = new byte[1]; + var outputBuffersCreated = new ConcurrentLinkedQueue(); + + var streamOverheadBytes = CodedOutputStream.computeStringSize(TrafficStream.CONNECTIONID_FIELD_NUMBER, TEST_TRAFFIC_STREAM_ID_STRING) + + CodedOutputStream.computeStringSize(TrafficStream.NODEID_FIELD_NUMBER, TEST_NODE_ID_STRING); + var spaceNeededForFlush = CodedOutputStream.computeInt32Size(TrafficStream.NUMBEROFTHISLASTCHUNK_FIELD_NUMBER, 1); + + var bufferSizeWithoutSpaceForBytes = CodedOutputStreamSizeUtil.maxBytesNeededForASegmentedObservation(REFERENCE_TIMESTAMP, + TrafficObservation.WRITESEGMENT_FIELD_NUMBER, + WriteSegmentObservation.DATA_FIELD_NUMBER, + Unpooled.buffer()) + streamOverheadBytes + spaceNeededForFlush; + + var serializer = createSerializerWithTestHandler(outputBuffersCreated, bufferSizeWithoutSpaceForBytes); + + Assertions.assertThrows(IllegalStateException.class, () -> { + serializer.addWriteEvent(REFERENCE_TIMESTAMP, Unpooled.wrappedBuffer(packetBytes)); + }); + + var future = serializer.flushCommitAndResetStream(true); + future.get(); + } + + @Test public void testCloseObservationAfterWriteWillFlushWhenSpaceNeeded() throws IOException, ExecutionException, InterruptedException { @@ -197,47 +269,33 @@ public void testEmptyPacketIsHandledForSmallCodedOutputStream() Assertions.assertEquals(0, reconstitutedTrafficStream.getSubStream(1).getWrite().getData().size()); } - @Test - public void testWriteIsHandledForBufferAllocatedLargerThanWritten() + @ParameterizedTest + @ValueSource(ints = {1, 16}) + public void testWriteIsHandledForBufferWithNioBufAllocatedLargerThanWritten(int numberOfChunks) throws IOException, ExecutionException, InterruptedException { + // Use ByteBuf that returns nioBuffer with limit < capacity this can cause some edge cases to trigger depending + // on specific CodedOutputStream apis used var outputBuffersCreated = new ConcurrentLinkedQueue(); - var serializer = createSerializerWithTestHandler(outputBuffersCreated, getEstimatedTrafficStreamByteSize(1, 200)); - ByteBuffer byteBuffer = ByteBuffer.allocateDirect(100); - byteBuffer.put(FAKE_READ_PACKET_DATA.getBytes(StandardCharsets.UTF_8)); - byteBuffer.flip(); + var dataBytes = FAKE_READ_PACKET_DATA.getBytes(StandardCharsets.UTF_8); - serializer.addDataMessage(TrafficObservation.WRITE_FIELD_NUMBER, WriteObservation.DATA_FIELD_NUMBER, REFERENCE_TIMESTAMP, byteBuffer); - var future = serializer.flushCommitAndResetStream(true); - future.get(); + Assertions.assertTrue(dataBytes.length >= numberOfChunks); - Assertions.assertEquals(0, byteBuffer.position()); + var streamOverheadBytes = CodedOutputStream.computeStringSize(TrafficStream.CONNECTIONID_FIELD_NUMBER, TEST_TRAFFIC_STREAM_ID_STRING) + + CodedOutputStream.computeStringSize(TrafficStream.NODEID_FIELD_NUMBER, TEST_NODE_ID_STRING) + 2; - var outputBuffersList = new ArrayList<>(outputBuffersCreated); - TrafficStream reconstitutedTrafficStream = TrafficStream.parseFrom(outputBuffersList.get(0)); - Assertions.assertEquals(FAKE_READ_PACKET_DATA, reconstitutedTrafficStream.getSubStream(0).getWrite().getData().toStringUtf8()); - } + var bufferSizeToGetDesiredChunks = CodedOutputStreamSizeUtil.maxBytesNeededForASegmentedObservation(REFERENCE_TIMESTAMP, 1,2, + Unpooled.wrappedBuffer(dataBytes,0,dataBytes.length / numberOfChunks)) + streamOverheadBytes; - @Test - public void testWriteIsHandledForBufferAllocatedLargerThanWrittenWithChunking() - throws IOException, ExecutionException, InterruptedException { - var outputBuffersCreated = new ConcurrentLinkedQueue(); - var serializer = createSerializerWithTestHandler(outputBuffersCreated, getEstimatedTrafficStreamByteSize(1, 4)); + var serializer = createSerializerWithTestHandler(outputBuffersCreated, bufferSizeToGetDesiredChunks); - ByteBuffer byteBuffer = ByteBuffer.allocate(100); - byteBuffer.put(FAKE_READ_PACKET_DATA.getBytes(StandardCharsets.UTF_8)); - byteBuffer.flip(); + var readOnlyBuf = createReadOnlyByteBufWithNioByteBufferWithCapacityLargerThanLimit(dataBytes); - Assertions.assertEquals(0, byteBuffer.position()); - Assertions.assertEquals(100, byteBuffer.capacity()); - Assertions.assertEquals(16, byteBuffer.limit()); + serializer.addWriteEvent(REFERENCE_TIMESTAMP, readOnlyBuf); - serializer.addDataMessage(TrafficObservation.WRITE_FIELD_NUMBER, WriteObservation.DATA_FIELD_NUMBER, REFERENCE_TIMESTAMP, byteBuffer); var future = serializer.flushCommitAndResetStream(true); future.get(); - Assertions.assertEquals(0, byteBuffer.position()); - List observations = new ArrayList<>(); for (ByteBuffer buffer : outputBuffersCreated) { var trafficStream = TrafficStream.parseFrom(buffer); @@ -246,12 +304,41 @@ public void testWriteIsHandledForBufferAllocatedLargerThanWrittenWithChunking() StringBuilder reconstructedData = new StringBuilder(); for (TrafficObservation observation : observations) { - var stringChunk = observation.getWriteSegment().getData().toStringUtf8(); + var stringChunk = ((numberOfChunks == 1) ? observation.getWrite().getData() + : observation.getWriteSegment().getData()).toStringUtf8(); reconstructedData.append(stringChunk); } Assertions.assertEquals(FAKE_READ_PACKET_DATA, reconstructedData.toString()); + // Expect extra observation for EndOfSegmentMessage when chunked + var expectedObservations = numberOfChunks > 1 ? numberOfChunks + 1 : numberOfChunks; + Assertions.assertEquals(expectedObservations, observations.size()); + Assertions.assertEquals(0, readOnlyBuf.readerIndex()); } + public static io.netty.buffer.ByteBuf createReadOnlyByteBufWithNioByteBufferWithCapacityLargerThanLimit(byte[] dataBytes) { + // Force creation of a ByteBuf that returns nioBuffer with limit < capacity. + // This can cause some edge cases to trigger during interaction with underlying ByteBuffers + final int spaceBetweenLimitAndCapacity = 100; + final ByteBuffer byteBuffer = ByteBuffer.allocateDirect(dataBytes.length + spaceBetweenLimitAndCapacity); + byteBuffer.put(dataBytes); + byteBuffer.position(dataBytes.length + spaceBetweenLimitAndCapacity); + byteBuffer.flip(); + + io.netty.buffer.ByteBuf buf = Unpooled.wrappedBuffer(byteBuffer.asReadOnlyBuffer()); + buf.writerIndex(buf.writerIndex() - spaceBetweenLimitAndCapacity); + + // Double check ByteBuf behaves as expected + var nioBuf = buf.nioBuffer(); + Assertions.assertEquals(0, nioBuf.position()); + Assertions.assertEquals(dataBytes.length + spaceBetweenLimitAndCapacity, nioBuf.capacity()); + Assertions.assertEquals(dataBytes.length, nioBuf.limit()); + + Assertions.assertEquals(dataBytes.length, buf.readableBytes()); + Assertions.assertEquals(0, buf.readerIndex()); + Assertions.assertEquals(dataBytes.length + spaceBetweenLimitAndCapacity, buf.capacity()); + + return buf; + } @Test public void testWithLimitlessCodedOutputStreamHolder()