diff --git a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java index 06c4c3d0c4..c30cae6fcf 100644 --- a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java +++ b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java @@ -5,6 +5,7 @@ import com.google.protobuf.Timestamp; import io.netty.buffer.ByteBuf; import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; import org.opensearch.migrations.trafficcapture.protos.CloseObservation; import org.opensearch.migrations.trafficcapture.protos.ConnectionExceptionObservation; @@ -17,8 +18,6 @@ import org.opensearch.migrations.trafficcapture.protos.TrafficStream; import org.opensearch.migrations.trafficcapture.protos.WriteObservation; import org.opensearch.migrations.trafficcapture.protos.WriteSegmentObservation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.SocketAddress; @@ -61,10 +60,9 @@ * } * */ +@Slf4j public class StreamChannelConnectionCaptureSerializer implements IChannelConnectionCaptureSerializer { - private final org.slf4j.Logger log; - // 100 is the default size of netty connectionId and kafka nodeId along with serializationTags private static final int MAX_ID_SIZE = 100; @@ -82,15 +80,6 @@ public class StreamChannelConnectionCaptureSerializer implements IChannelConn public StreamChannelConnectionCaptureSerializer(String nodeId, String connectionId, @NonNull StreamLifecycleManager streamLifecycleManager) { - this(nodeId, connectionId, streamLifecycleManager, LoggerFactory.getLogger(StreamChannelConnectionCaptureSerializer.class)); - } - - - // Exposed for testing - public StreamChannelConnectionCaptureSerializer(String nodeId, String connectionId, - StreamLifecycleManager streamLifecycleManager, - Logger log) { - this.log = log; this.streamManager = streamLifecycleManager; assert (nodeId == null ? 0 : CodedOutputStream.computeStringSize(TrafficStream.NODEID_FIELD_NUMBER, nodeId)) + CodedOutputStream.computeStringSize(TrafficStream.CONNECTIONID_FIELD_NUMBER, connectionId) @@ -160,7 +149,8 @@ private void beginSubstreamObservation(Instant timestamp, int captureTagFieldNum final var observationContentSize = tsTagSize + tsContentSize + captureTagNoLengthSize + captureTagLengthAndContentSize; // Ensure space is available before starting an observation if (getOrCreateCodedOutputStream().spaceLeft() < - CodedOutputStreamSizeUtil.bytesNeededForObservationAndClosingIndex(observationContentSize, numFlushesSoFar + 1)) { + CodedOutputStreamSizeUtil.bytesNeededForObservationAndClosingIndex(observationContentSize, numFlushesSoFar + 1)) + { flushCommitAndResetStream(false); } // e.g. 2 { @@ -279,7 +269,8 @@ private void addDataMessage(int captureFieldNumber, int dataFieldNumber, Instant if (captureFieldNumber == TrafficObservation.READ_FIELD_NUMBER) { segmentFieldNumber = TrafficObservation.READSEGMENT_FIELD_NUMBER; segmentDataFieldNumber = ReadSegmentObservation.DATA_FIELD_NUMBER; - } else { + } + else { segmentFieldNumber = TrafficObservation.WRITESEGMENT_FIELD_NUMBER; segmentDataFieldNumber = WriteSegmentObservation.DATA_FIELD_NUMBER; } @@ -289,7 +280,7 @@ private void addDataMessage(int captureFieldNumber, int dataFieldNumber, Instant // when considering the case of a message that does not need segments or for the case of a smaller segment created // from a much larger message int messageAndOverheadBytesLeft = CodedOutputStreamSizeUtil.maxBytesNeededForASegmentedObservation(timestamp, - segmentFieldNumber, segmentDataFieldNumber, byteBuffer); + segmentFieldNumber, segmentDataFieldNumber, byteBuffer); int trafficStreamOverhead = messageAndOverheadBytesLeft - byteBuffer.capacity(); // Ensure that space for at least one data byte and overhead exists, otherwise a flush is necessary. @@ -305,7 +296,7 @@ private void addDataMessage(int captureFieldNumber, int dataFieldNumber, Instant return; } - while (byteBuffer.position() < byteBuffer.limit()) { + while(byteBuffer.position() < byteBuffer.limit()) { int availableCOSSpace = getOrCreateCodedOutputStream().spaceLeft(); int chunkBytes = messageAndOverheadBytesLeft > availableCOSSpace ? availableCOSSpace - trafficStreamOverhead : byteBuffer.limit() - byteBuffer.position(); ByteBuffer bb = byteBuffer.slice(); @@ -326,7 +317,7 @@ private void addDataMessage(int captureFieldNumber, int dataFieldNumber, Instant } private void addSubstreamMessage(int captureFieldNumber, int dataFieldNumber, int dataCountFieldNumber, int dataCount, - Instant timestamp, java.nio.ByteBuffer byteBuffer) throws IOException { + Instant timestamp, java.nio.ByteBuffer byteBuffer) throws IOException { int dataSize = 0; int segmentCountSize = 0; int captureClosureLength = 1; @@ -358,7 +349,7 @@ private void addSubstreamMessage(int captureFieldNumber, int dataFieldNumber, in } private void addSubstreamMessage(int captureFieldNumber, int dataFieldNumber, Instant timestamp, - java.nio.ByteBuffer byteBuffer) throws IOException { + java.nio.ByteBuffer byteBuffer) throws IOException { addSubstreamMessage(captureFieldNumber, dataFieldNumber, 0, 0, timestamp, byteBuffer); } @@ -465,8 +456,8 @@ private void observationSizeSanityCheck(int minExpectedSpaceAfterObservation, in int actualRemainingSpace = getOrCreateCodedOutputStream().spaceLeft(); if (actualRemainingSpace < minExpectedSpaceAfterObservation || minExpectedSpaceAfterObservation < 0) { log.warn("Writing a substream (capture type: {}) for Traffic Stream: {} left {} bytes in the CodedOutputStream but we calculated " + - "at least {} bytes remaining, this should be investigated", fieldNumber, connectionIdString + "." + (numFlushesSoFar + 1), - actualRemainingSpace, minExpectedSpaceAfterObservation); + "at least {} bytes remaining, this should be investigated", fieldNumber, connectionIdString + "." + (numFlushesSoFar + 1), + actualRemainingSpace, minExpectedSpaceAfterObservation); } } } diff --git a/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java b/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java index 6fc0d75b36..f393957827 100644 --- a/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java +++ b/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java @@ -1,16 +1,26 @@ package org.opensearch.migrations.trafficcapture; +import static org.junit.jupiter.api.Assertions.assertThrows; + import com.google.protobuf.ByteString; -import com.google.protobuf.CodedOutputStream; import com.google.protobuf.Timestamp; import io.netty.buffer.Unpooled; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Base64; +import java.util.Comparator; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; import lombok.AllArgsConstructor; import lombok.Lombok; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import org.mockito.ArgumentMatchers; -import org.mockito.Mockito; import org.opensearch.migrations.trafficcapture.protos.CloseObservation; import org.opensearch.migrations.trafficcapture.protos.ConnectionExceptionObservation; import org.opensearch.migrations.trafficcapture.protos.EndOfMessageIndication; @@ -19,88 +29,48 @@ import org.opensearch.migrations.trafficcapture.protos.TrafficObservation; import org.opensearch.migrations.trafficcapture.protos.TrafficStream; import org.opensearch.migrations.trafficcapture.protos.WriteObservation; -import org.slf4j.Logger; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.time.Clock; -import java.time.Instant; -import java.util.ArrayList; -import java.util.Base64; -import java.util.Comparator; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ExecutionException; - -import static org.junit.jupiter.api.Assertions.assertThrows; @Slf4j class StreamChannelConnectionCaptureSerializerTest { - private final static String FAKE_EXCEPTION_DATA = "abcdefghijklmnop"; - private final static String FAKE_READ_PACKET_DATA = "ABCDEFGHIJKLMNOP"; + public static final String TEST_TRAFFIC_STREAM_ID_STRING = "Test"; public static final String TEST_NODE_ID_STRING = "test_node_id"; + // Reference Timestamp chosen in the future with nanosecond precision resemble an upper bound on space overhead + public static final Instant REFERENCE_TIMESTAMP = Instant.parse("2999-01-01T23:59:59.98765432Z"); + private final static String FAKE_EXCEPTION_DATA = "abcdefghijklmnop"; + private final static String FAKE_READ_PACKET_DATA = "ABCDEFGHIJKLMNOP"; + private static int getEstimatedTrafficStreamByteSize(int readWriteEventCount, int averageDataPacketSize) { - var fixedTimestamp = Timestamp.newBuilder() - .setSeconds(Instant.now().getEpochSecond()) - .setNanos(Instant.now().getNano()) - .build(); - - return TrafficStream.newBuilder() - .setNodeId(TEST_NODE_ID_STRING) - .setConnectionId(TEST_TRAFFIC_STREAM_ID_STRING) - .addSubStream(TrafficObservation.newBuilder().setTs(fixedTimestamp) - .setClose(CloseObservation.newBuilder().build()).build()) - .build().getSerializedSize() + - - (((TrafficObservation.newBuilder() - .setTs(fixedTimestamp) - .setWrite(WriteObservation.newBuilder().build()).build()).getSerializedSize() - + 2 // add 2 for subStream Overhead - ) * readWriteEventCount) - + averageDataPacketSize * readWriteEventCount; + var fixedTimestamp = Timestamp.newBuilder().setSeconds(REFERENCE_TIMESTAMP.getEpochSecond()) + .setNanos(REFERENCE_TIMESTAMP.getNano()).build(); + + return TrafficStream.newBuilder().setNodeId(TEST_NODE_ID_STRING).setConnectionId(TEST_TRAFFIC_STREAM_ID_STRING) + .addSubStream(TrafficObservation.newBuilder().setTs(fixedTimestamp) + .setClose(CloseObservation.newBuilder().build()).build()).build().getSerializedSize() + + + (((TrafficObservation.newBuilder().setTs(fixedTimestamp).setWrite(WriteObservation.newBuilder().build()) + .build()).getSerializedSize() + 2 // add 2 for subStream Overhead + ) * readWriteEventCount) + averageDataPacketSize * readWriteEventCount; } private static TrafficStream makeSampleTrafficStream(Instant t) { - var fixedTimestamp = Timestamp.newBuilder() - .setSeconds(t.getEpochSecond()) - .setNanos(t.getNano()) - .build(); - return TrafficStream.newBuilder() - .setNodeId(TEST_NODE_ID_STRING) - .setConnectionId(TEST_TRAFFIC_STREAM_ID_STRING) - .setNumberOfThisLastChunk(1) - .addSubStream(TrafficObservation.newBuilder().setTs(fixedTimestamp) - .setRead(ReadObservation.newBuilder() - .setData(ByteString.copyFrom(FAKE_READ_PACKET_DATA.getBytes(StandardCharsets.UTF_8))) - .build()) - .build()) - .addSubStream(TrafficObservation.newBuilder().setTs(fixedTimestamp) - .setRead(ReadObservation.newBuilder() - .build()) - .build()) - .addSubStream(TrafficObservation.newBuilder().setTs(fixedTimestamp) - .setConnectionException(ConnectionExceptionObservation.newBuilder() - .setMessage(FAKE_EXCEPTION_DATA) - .build()) - .build()) - .addSubStream(TrafficObservation.newBuilder().setTs(fixedTimestamp) - .setConnectionException(ConnectionExceptionObservation.newBuilder() - .build()) - .build()) - .addSubStream(TrafficObservation.newBuilder().setTs(fixedTimestamp) - .setEndOfMessageIndicator(EndOfMessageIndication.newBuilder() - .setFirstLineByteLength(17) - .setHeadersByteLength(72) - .build()) - .build()) - .addSubStream(TrafficObservation.newBuilder().setTs(fixedTimestamp) - .setClose(CloseObservation.newBuilder().build()) - .build()) - .build(); + var fixedTimestamp = Timestamp.newBuilder().setSeconds(t.getEpochSecond()).setNanos(t.getNano()).build(); + return TrafficStream.newBuilder().setNodeId(TEST_NODE_ID_STRING).setConnectionId(TEST_TRAFFIC_STREAM_ID_STRING) + .setNumberOfThisLastChunk(1).addSubStream(TrafficObservation.newBuilder().setTs(fixedTimestamp).setRead( + ReadObservation.newBuilder() + .setData(ByteString.copyFrom(FAKE_READ_PACKET_DATA.getBytes(StandardCharsets.UTF_8))).build()) + .build()).addSubStream( + TrafficObservation.newBuilder().setTs(fixedTimestamp).setRead(ReadObservation.newBuilder().build()) + .build()).addSubStream(TrafficObservation.newBuilder().setTs(fixedTimestamp).setConnectionException( + ConnectionExceptionObservation.newBuilder().setMessage(FAKE_EXCEPTION_DATA).build()).build()) + .addSubStream(TrafficObservation.newBuilder().setTs(fixedTimestamp) + .setConnectionException(ConnectionExceptionObservation.newBuilder().build()).build()).addSubStream( + TrafficObservation.newBuilder().setTs(fixedTimestamp).setEndOfMessageIndicator( + EndOfMessageIndication.newBuilder().setFirstLineByteLength(17).setHeadersByteLength(72).build()) + .build()).addSubStream( + TrafficObservation.newBuilder().setTs(fixedTimestamp).setClose(CloseObservation.newBuilder().build()) + .build()).build(); } private static int getIndexForTrafficStream(TrafficStream s) { @@ -109,7 +79,6 @@ private static int getIndexForTrafficStream(TrafficStream s) { @Test public void testLargeReadPacketIsSplit() throws IOException, ExecutionException, InterruptedException { - final var referenceTimestamp = Instant.now(Clock.systemUTC()); var outputBuffersCreated = new ConcurrentLinkedQueue(); var serializer = createSerializerWithTestHandler(outputBuffersCreated, 1024 * 1024); @@ -117,7 +86,7 @@ public void testLargeReadPacketIsSplit() throws IOException, ExecutionException, String data = FAKE_READ_PACKET_DATA.repeat((1024 * 1024 / FAKE_READ_PACKET_DATA.length()) + 1); byte[] fakeDataBytes = data.getBytes(StandardCharsets.UTF_8); var bb = Unpooled.wrappedBuffer(fakeDataBytes); - serializer.addReadEvent(referenceTimestamp, bb); + serializer.addReadEvent(REFERENCE_TIMESTAMP, bb); var future = serializer.flushCommitAndResetStream(true); future.get(); bb.release(); @@ -128,8 +97,8 @@ public void testLargeReadPacketIsSplit() throws IOException, ExecutionException, for (int i = 0; i < 2; ++i) { reconstitutedTrafficStreamsList.add(TrafficStream.parseFrom(outputBuffersList.get(i))); } - reconstitutedTrafficStreamsList - .sort(Comparator.comparingInt(StreamChannelConnectionCaptureSerializerTest::getIndexForTrafficStream)); + reconstitutedTrafficStreamsList.sort( + Comparator.comparingInt(StreamChannelConnectionCaptureSerializerTest::getIndexForTrafficStream)); int totalSize = 0; for (int i = 0; i < 2; ++i) { var reconstitutedTrafficStream = reconstitutedTrafficStreamsList.get(i); @@ -142,17 +111,15 @@ public void testLargeReadPacketIsSplit() throws IOException, ExecutionException, @Test public void testBasicDataConsistencyWhenChunking() throws IOException, ExecutionException, InterruptedException { - final var referenceTimestamp = Instant.now(); var packetData = FAKE_READ_PACKET_DATA.repeat(500); byte[] packetBytes = packetData.getBytes(StandardCharsets.UTF_8); var outputBuffersCreated = new ConcurrentLinkedQueue(); - // Picking buffer to half size so as to require require chunking + // Picking buffer to half size to require chunking var serializer = createSerializerWithTestHandler(outputBuffersCreated, - getEstimatedTrafficStreamByteSize( - 1, packetBytes.length) / 2); + getEstimatedTrafficStreamByteSize(1, packetBytes.length) / 2); var bb = Unpooled.wrappedBuffer(packetBytes); - serializer.addWriteEvent(referenceTimestamp, bb); + serializer.addWriteEvent(REFERENCE_TIMESTAMP, bb); var future = serializer.flushCommitAndResetStream(true); future.get(); bb.release(); @@ -173,21 +140,18 @@ public void testBasicDataConsistencyWhenChunking() throws IOException, Execution } @Test - public void testCloseObservationAfterWriteWillFlushWhenSpaceNeeded() throws IOException, ExecutionException, InterruptedException { - final var referenceTimestamp = Instant.now(); + public void testCloseObservationAfterWriteWillFlushWhenSpaceNeeded() + throws IOException, ExecutionException, InterruptedException { byte[] packetBytes = FAKE_READ_PACKET_DATA.getBytes(StandardCharsets.UTF_8); var outputBuffersCreated = new ConcurrentLinkedQueue(); // Picking small buffer that can only hold one write observation and no other observations var serializer = createSerializerWithTestHandler(outputBuffersCreated, - getEstimatedTrafficStreamByteSize( - 1, packetBytes.length) - - CloseObservation.newBuilder().build().getSerializedSize() - ); - + getEstimatedTrafficStreamByteSize(1, packetBytes.length) - CloseObservation.newBuilder().build() + .getSerializedSize()); var bb = Unpooled.wrappedBuffer(packetBytes); - serializer.addWriteEvent(referenceTimestamp, bb); - serializer.addCloseEvent(referenceTimestamp); + serializer.addWriteEvent(REFERENCE_TIMESTAMP, bb); + serializer.addCloseEvent(REFERENCE_TIMESTAMP); var future = serializer.flushCommitAndResetStream(true); future.get(); bb.release(); @@ -205,15 +169,13 @@ public void testCloseObservationAfterWriteWillFlushWhenSpaceNeeded() throws IOEx @Test public void testEmptyPacketIsHandledForSmallCodedOutputStream() - throws IOException, ExecutionException, InterruptedException { - final var referenceTimestamp = Instant.now(Clock.systemUTC()); + throws IOException, ExecutionException, InterruptedException { var outputBuffersCreated = new ConcurrentLinkedQueue(); // Picking small buffer size that can only hold two empty message - var serializer = createSerializerWithTestHandler(outputBuffersCreated, - getEstimatedTrafficStreamByteSize(2, 0)); + var serializer = createSerializerWithTestHandler(outputBuffersCreated, getEstimatedTrafficStreamByteSize(2, 0)); var bb = Unpooled.buffer(0); - serializer.addWriteEvent(referenceTimestamp, bb); - serializer.addWriteEvent(referenceTimestamp, bb); + serializer.addWriteEvent(REFERENCE_TIMESTAMP, bb); + serializer.addWriteEvent(REFERENCE_TIMESTAMP, bb); var future = serializer.flushCommitAndResetStream(true); future.get(); bb.release(); @@ -224,17 +186,10 @@ public void testEmptyPacketIsHandledForSmallCodedOutputStream() Assertions.assertEquals(0, reconstitutedTrafficStream.getSubStream(1).getWrite().getData().size()); } - private static class TestException extends RuntimeException { - public TestException(String message) { - super(message); - } - } - @Test public void testThatReadCanBeDeserialized() throws IOException, ExecutionException, InterruptedException { - final var referenceTimestamp = Instant.now(Clock.systemUTC()); // these are only here as a debugging aid - var groundTruth = makeSampleTrafficStream(referenceTimestamp); + var groundTruth = makeSampleTrafficStream(REFERENCE_TIMESTAMP); System.err.println("groundTruth: " + groundTruth); // Pasting this into `base64 -d | protoc --decode_raw` will also show the structure var groundTruthBytes = groundTruth.toByteArray(); @@ -243,15 +198,15 @@ public void testThatReadCanBeDeserialized() throws IOException, ExecutionExcepti var outputBuffersCreated = new ConcurrentLinkedQueue(); var serializer = createSerializerWithTestHandler(outputBuffersCreated, 1024 * 1024); var bb = Unpooled.wrappedBuffer(FAKE_READ_PACKET_DATA.getBytes(StandardCharsets.UTF_8)); - serializer.addReadEvent(referenceTimestamp, bb); + serializer.addReadEvent(REFERENCE_TIMESTAMP, bb); bb.clear(); - serializer.addReadEvent(referenceTimestamp, bb); - serializer.addExceptionCaughtEvent(referenceTimestamp, new TestException(FAKE_EXCEPTION_DATA)); - serializer.addExceptionCaughtEvent(referenceTimestamp, new TestException("")); + serializer.addReadEvent(REFERENCE_TIMESTAMP, bb); + serializer.addExceptionCaughtEvent(REFERENCE_TIMESTAMP, new TestException(FAKE_EXCEPTION_DATA)); + serializer.addExceptionCaughtEvent(REFERENCE_TIMESTAMP, new TestException("")); serializer.addEndOfFirstLineIndicator(17); serializer.addEndOfHeadersIndicator(72); - serializer.commitEndOfHttpMessageIndicator(referenceTimestamp); - serializer.addCloseEvent(referenceTimestamp); + serializer.commitEndOfHttpMessageIndicator(REFERENCE_TIMESTAMP); + serializer.addCloseEvent(REFERENCE_TIMESTAMP); serializer.flushCommitAndResetStream(true).get(); bb.release(); @@ -269,17 +224,17 @@ public void testThatReadCanBeDeserialized() throws IOException, ExecutionExcepti } @Test - public void testEndOfSegmentsIndicationAddedWhenChunking() throws IOException, ExecutionException, InterruptedException { - final var referenceTimestamp = Instant.now(); + public void testEndOfSegmentsIndicationAddedWhenChunking() + throws IOException, ExecutionException, InterruptedException { var packetData = FAKE_READ_PACKET_DATA.repeat(500); byte[] packetBytes = packetData.getBytes(StandardCharsets.UTF_8); var outputBuffersCreated = new ConcurrentLinkedQueue(); - // Picking buffer to half size so as to require require chunking + // Picking buffer to half size to require chunking var serializer = createSerializerWithTestHandler(outputBuffersCreated, - getEstimatedTrafficStreamByteSize(1, packetBytes.length) / 2); + getEstimatedTrafficStreamByteSize(1, packetBytes.length) / 2); var bb = Unpooled.wrappedBuffer(packetBytes); - serializer.addWriteEvent(referenceTimestamp, bb); + serializer.addWriteEvent(REFERENCE_TIMESTAMP, bb); var future = serializer.flushCommitAndResetStream(true); future.get(); bb.release(); @@ -302,8 +257,8 @@ public void testEndOfSegmentsIndicationAddedWhenChunking() throws IOException, E } @Test - public void testEndOfSegmentsIndicationNotAddedWhenNotChunking() throws IOException, ExecutionException, InterruptedException { - final var referenceTimestamp = Instant.now(); + public void testEndOfSegmentsIndicationNotAddedWhenNotChunking() + throws IOException, ExecutionException, InterruptedException { var packetData = FAKE_READ_PACKET_DATA.repeat(10); byte[] packetBytes = packetData.getBytes(StandardCharsets.UTF_8); var outputBuffersCreated = new ConcurrentLinkedQueue(); @@ -311,7 +266,7 @@ public void testEndOfSegmentsIndicationNotAddedWhenNotChunking() throws IOExcept var serializer = createSerializerWithTestHandler(outputBuffersCreated, 500); var bb = Unpooled.wrappedBuffer(packetBytes); - serializer.addWriteEvent(referenceTimestamp, bb); + serializer.addWriteEvent(REFERENCE_TIMESTAMP, bb); var future = serializer.flushCommitAndResetStream(true); future.get(); bb.release(); @@ -336,11 +291,13 @@ public void testAssertionErrorDuringInitializationWhenInitializeWithTooLargeId() final String realNodeId = "b671d2f2-577b-414e-9eb4-8bc3e89ee182"; final String realKafkaConnectionId = "9a25a4fffe620014-00034cfa-00000001-d208faac76346d02-864e38e2"; + // Prepending "a" to a realNodeId to create a larger than expected id to trigger failure + final String tooLargeNodeId = 'a' + realNodeId; + var outputBuffersCreated = new ConcurrentLinkedQueue(); - assertThrows(AssertionError.class, () -> - new StreamChannelConnectionCaptureSerializer<>("a" + realNodeId, realKafkaConnectionId, - new StreamManager(getEstimatedTrafficStreamByteSize(0, 0), outputBuffersCreated)) - ); + assertThrows(AssertionError.class, + () -> new StreamChannelConnectionCaptureSerializer<>("a" + tooLargeNodeId, realKafkaConnectionId, + new StreamManager(getEstimatedTrafficStreamByteSize(0, 0), outputBuffersCreated))); } @Test @@ -350,37 +307,25 @@ public void testInitializationWithRealIds() { var outputBuffersCreated = new ConcurrentLinkedQueue(); new StreamChannelConnectionCaptureSerializer<>(realNodeId, realKafkaConnectionId, - new StreamManager(getEstimatedTrafficStreamByteSize(0, 0), outputBuffersCreated)); + new StreamManager(getEstimatedTrafficStreamByteSize(0, 0), outputBuffersCreated)); } - @Test - public void testOutputStreamReportsIncorrectSpaceLeft_thenObservationSizeSanityCheckLogAppears() throws IOException { - Logger mockLogger = Mockito.mock(Logger.class); - - StreamManager mockStreamManager = Mockito.mock(StreamManager.class); - CodedOutputStreamHolder mockHolder = Mockito.mock(CodedOutputStreamHolder.class); - CodedOutputStream mockOutputStream = Mockito.mock(CodedOutputStream.class); - - Mockito.when(mockStreamManager.createStream()).thenReturn(mockHolder); - Mockito.when(mockHolder.getOutputStream()).thenReturn(mockOutputStream); - Mockito.when(mockOutputStream.spaceLeft()).thenReturn(5); - var serializer = new StreamChannelConnectionCaptureSerializer<>(TEST_NODE_ID_STRING, TEST_TRAFFIC_STREAM_ID_STRING, - mockStreamManager, mockLogger); - var bb = Unpooled.buffer(getEstimatedTrafficStreamByteSize(1, 0)); - serializer.addWriteEvent(Instant.now(), bb); - serializer.flushCommitAndResetStream(true); - bb.release(); + private StreamChannelConnectionCaptureSerializer createSerializerWithTestHandler( + ConcurrentLinkedQueue outputBuffers, int bufferSize) { + return new StreamChannelConnectionCaptureSerializer<>(TEST_NODE_ID_STRING, TEST_TRAFFIC_STREAM_ID_STRING, + new StreamManager(bufferSize, outputBuffers)); + } + + private static class TestException extends RuntimeException { - Mockito.verify(mockLogger, Mockito.times(1)).warn( - ArgumentMatchers.eq("Writing a substream (capture type: {}) for Traffic Stream: {} left {} bytes in the CodedOutputStream but we calculated at least {} bytes remaining, this should be investigated"), - ArgumentMatchers.any(int.class), - ArgumentMatchers.any(String.class), - ArgumentMatchers.any(int.class), - ArgumentMatchers.any(int.class)); + public TestException(String message) { + super(message); + } } @AllArgsConstructor static class StreamManager extends OrderedStreamLifecyleManager { + int bufferSize; ConcurrentLinkedQueue outputBuffers; @@ -392,8 +337,8 @@ public CodedOutputStreamHolder createStream() { @Override protected CompletableFuture kickoffCloseStream(CodedOutputStreamHolder outputStreamHolder, int index) { if (!(outputStreamHolder instanceof CodedOutputStreamAndByteBufferWrapper)) { - throw new IllegalStateException("Unknown outputStreamHolder sent back to StreamManager: " + - outputStreamHolder); + throw new IllegalStateException( + "Unknown outputStreamHolder sent back to StreamManager: " + outputStreamHolder); } var osh = (CodedOutputStreamAndByteBufferWrapper) outputStreamHolder; log.trace("Getting ready to flush for " + osh); @@ -415,10 +360,4 @@ protected CompletableFuture kickoffCloseStream(CodedOutputStreamHolder out }).thenApply(x -> null); } } - - private StreamChannelConnectionCaptureSerializer - createSerializerWithTestHandler(ConcurrentLinkedQueue outputBuffers, int bufferSize) { - return new StreamChannelConnectionCaptureSerializer<>(TEST_NODE_ID_STRING, TEST_TRAFFIC_STREAM_ID_STRING, - new StreamManager(bufferSize, outputBuffers)); - } } \ No newline at end of file