diff --git a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java index b7a82c8a12..8f9ee1f5e6 100644 --- a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java +++ b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java @@ -18,6 +18,8 @@ import org.opensearch.migrations.trafficcapture.protos.TrafficStream; import org.opensearch.migrations.trafficcapture.protos.WriteObservation; import org.opensearch.migrations.trafficcapture.protos.WriteSegmentObservation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.SocketAddress; @@ -60,9 +62,10 @@ * } * */ -@Slf4j public class StreamChannelConnectionCaptureSerializer implements IChannelConnectionCaptureSerializer { + private final org.slf4j.Logger log; + private static final int MAX_ID_SIZE = 100; private boolean readObservationsAreWaitingForEom; @@ -79,6 +82,15 @@ public class StreamChannelConnectionCaptureSerializer implements IChannelConn public StreamChannelConnectionCaptureSerializer(String nodeId, String connectionId, @NonNull StreamLifecycleManager streamLifecycleManager) { + this(nodeId, connectionId, streamLifecycleManager, LoggerFactory.getLogger(StreamChannelConnectionCaptureSerializer.class)); + } + + + // Exposed for testing + public StreamChannelConnectionCaptureSerializer(String nodeId, String connectionId, + StreamLifecycleManager streamLifecycleManager, + Logger log) { + this.log = log; this.streamManager = streamLifecycleManager; assert (nodeId == null ? 0 : CodedOutputStream.computeStringSize(TrafficStream.NODEID_FIELD_NUMBER, nodeId)) + CodedOutputStream.computeStringSize(TrafficStream.CONNECTIONID_FIELD_NUMBER, connectionId) diff --git a/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java b/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java index 172205482a..6fc0d75b36 100644 --- a/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java +++ b/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java @@ -1,6 +1,7 @@ package org.opensearch.migrations.trafficcapture; import com.google.protobuf.ByteString; +import com.google.protobuf.CodedOutputStream; import com.google.protobuf.Timestamp; import io.netty.buffer.Unpooled; import lombok.AllArgsConstructor; @@ -8,6 +9,8 @@ import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; import org.opensearch.migrations.trafficcapture.protos.CloseObservation; import org.opensearch.migrations.trafficcapture.protos.ConnectionExceptionObservation; import org.opensearch.migrations.trafficcapture.protos.EndOfMessageIndication; @@ -16,6 +19,7 @@ import org.opensearch.migrations.trafficcapture.protos.TrafficObservation; import org.opensearch.migrations.trafficcapture.protos.TrafficStream; import org.opensearch.migrations.trafficcapture.protos.WriteObservation; +import org.slf4j.Logger; import java.io.IOException; import java.nio.ByteBuffer; @@ -349,6 +353,32 @@ public void testInitializationWithRealIds() { new StreamManager(getEstimatedTrafficStreamByteSize(0, 0), outputBuffersCreated)); } + @Test + public void testOutputStreamReportsIncorrectSpaceLeft_thenObservationSizeSanityCheckLogAppears() throws IOException { + Logger mockLogger = Mockito.mock(Logger.class); + + StreamManager mockStreamManager = Mockito.mock(StreamManager.class); + CodedOutputStreamHolder mockHolder = Mockito.mock(CodedOutputStreamHolder.class); + CodedOutputStream mockOutputStream = Mockito.mock(CodedOutputStream.class); + + Mockito.when(mockStreamManager.createStream()).thenReturn(mockHolder); + Mockito.when(mockHolder.getOutputStream()).thenReturn(mockOutputStream); + Mockito.when(mockOutputStream.spaceLeft()).thenReturn(5); + var serializer = new StreamChannelConnectionCaptureSerializer<>(TEST_NODE_ID_STRING, TEST_TRAFFIC_STREAM_ID_STRING, + mockStreamManager, mockLogger); + var bb = Unpooled.buffer(getEstimatedTrafficStreamByteSize(1, 0)); + serializer.addWriteEvent(Instant.now(), bb); + serializer.flushCommitAndResetStream(true); + bb.release(); + + Mockito.verify(mockLogger, Mockito.times(1)).warn( + ArgumentMatchers.eq("Writing a substream (capture type: {}) for Traffic Stream: {} left {} bytes in the CodedOutputStream but we calculated at least {} bytes remaining, this should be investigated"), + ArgumentMatchers.any(int.class), + ArgumentMatchers.any(String.class), + ArgumentMatchers.any(int.class), + ArgumentMatchers.any(int.class)); + } + @AllArgsConstructor static class StreamManager extends OrderedStreamLifecyleManager { int bufferSize;