Skip to content

Commit

Permalink
Address PR Comments removing Mockito dependency and simplifying test …
Browse files Browse the repository at this point in the history
…cases

Signed-off-by: Andre Kurait <[email protected]>
  • Loading branch information
AndreKurait committed Feb 1, 2024
1 parent 6a3cdec commit f42650a
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.google.protobuf.Timestamp;
import io.netty.buffer.ByteBuf;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

import org.opensearch.migrations.trafficcapture.protos.CloseObservation;
import org.opensearch.migrations.trafficcapture.protos.ConnectionExceptionObservation;
Expand All @@ -17,8 +18,6 @@
import org.opensearch.migrations.trafficcapture.protos.TrafficStream;
import org.opensearch.migrations.trafficcapture.protos.WriteObservation;
import org.opensearch.migrations.trafficcapture.protos.WriteSegmentObservation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.SocketAddress;
Expand Down Expand Up @@ -61,10 +60,9 @@
* }
* </pre>
*/
@Slf4j
public class StreamChannelConnectionCaptureSerializer<T> implements IChannelConnectionCaptureSerializer<T> {

private final org.slf4j.Logger log;

// 100 is the default size of netty connectionId and kafka nodeId along with serializationTags
private static final int MAX_ID_SIZE = 100;

Expand All @@ -82,15 +80,6 @@ public class StreamChannelConnectionCaptureSerializer<T> implements IChannelConn

public StreamChannelConnectionCaptureSerializer(String nodeId, String connectionId,
@NonNull StreamLifecycleManager<T> streamLifecycleManager) {
this(nodeId, connectionId, streamLifecycleManager, LoggerFactory.getLogger(StreamChannelConnectionCaptureSerializer.class));
}


// Exposed for testing
public StreamChannelConnectionCaptureSerializer(String nodeId, String connectionId,
StreamLifecycleManager<T> streamLifecycleManager,
Logger log) {
this.log = log;
this.streamManager = streamLifecycleManager;
assert (nodeId == null ? 0 : CodedOutputStream.computeStringSize(TrafficStream.NODEID_FIELD_NUMBER, nodeId)) +
CodedOutputStream.computeStringSize(TrafficStream.CONNECTIONID_FIELD_NUMBER, connectionId)
Expand Down Expand Up @@ -160,7 +149,8 @@ private void beginSubstreamObservation(Instant timestamp, int captureTagFieldNum
final var observationContentSize = tsTagSize + tsContentSize + captureTagNoLengthSize + captureTagLengthAndContentSize;
// Ensure space is available before starting an observation
if (getOrCreateCodedOutputStream().spaceLeft() <
CodedOutputStreamSizeUtil.bytesNeededForObservationAndClosingIndex(observationContentSize, numFlushesSoFar + 1)) {
CodedOutputStreamSizeUtil.bytesNeededForObservationAndClosingIndex(observationContentSize, numFlushesSoFar + 1))
{
flushCommitAndResetStream(false);
}
// e.g. 2 {
Expand Down Expand Up @@ -279,7 +269,8 @@ private void addDataMessage(int captureFieldNumber, int dataFieldNumber, Instant
if (captureFieldNumber == TrafficObservation.READ_FIELD_NUMBER) {
segmentFieldNumber = TrafficObservation.READSEGMENT_FIELD_NUMBER;
segmentDataFieldNumber = ReadSegmentObservation.DATA_FIELD_NUMBER;
} else {
}
else {
segmentFieldNumber = TrafficObservation.WRITESEGMENT_FIELD_NUMBER;
segmentDataFieldNumber = WriteSegmentObservation.DATA_FIELD_NUMBER;
}
Expand All @@ -289,7 +280,7 @@ private void addDataMessage(int captureFieldNumber, int dataFieldNumber, Instant
// when considering the case of a message that does not need segments or for the case of a smaller segment created
// from a much larger message
int messageAndOverheadBytesLeft = CodedOutputStreamSizeUtil.maxBytesNeededForASegmentedObservation(timestamp,
segmentFieldNumber, segmentDataFieldNumber, byteBuffer);
segmentFieldNumber, segmentDataFieldNumber, byteBuffer);
int trafficStreamOverhead = messageAndOverheadBytesLeft - byteBuffer.capacity();

// Ensure that space for at least one data byte and overhead exists, otherwise a flush is necessary.
Expand All @@ -305,7 +296,7 @@ private void addDataMessage(int captureFieldNumber, int dataFieldNumber, Instant
return;
}

while (byteBuffer.position() < byteBuffer.limit()) {
while(byteBuffer.position() < byteBuffer.limit()) {
int availableCOSSpace = getOrCreateCodedOutputStream().spaceLeft();
int chunkBytes = messageAndOverheadBytesLeft > availableCOSSpace ? availableCOSSpace - trafficStreamOverhead : byteBuffer.limit() - byteBuffer.position();
ByteBuffer bb = byteBuffer.slice();
Expand All @@ -326,7 +317,7 @@ private void addDataMessage(int captureFieldNumber, int dataFieldNumber, Instant
}

private void addSubstreamMessage(int captureFieldNumber, int dataFieldNumber, int dataCountFieldNumber, int dataCount,
Instant timestamp, java.nio.ByteBuffer byteBuffer) throws IOException {
Instant timestamp, java.nio.ByteBuffer byteBuffer) throws IOException {
int dataSize = 0;
int segmentCountSize = 0;
int captureClosureLength = 1;
Expand Down Expand Up @@ -358,7 +349,7 @@ private void addSubstreamMessage(int captureFieldNumber, int dataFieldNumber, in
}

private void addSubstreamMessage(int captureFieldNumber, int dataFieldNumber, Instant timestamp,
java.nio.ByteBuffer byteBuffer) throws IOException {
java.nio.ByteBuffer byteBuffer) throws IOException {
addSubstreamMessage(captureFieldNumber, dataFieldNumber, 0, 0, timestamp, byteBuffer);
}

Expand Down Expand Up @@ -465,8 +456,8 @@ private void observationSizeSanityCheck(int minExpectedSpaceAfterObservation, in
int actualRemainingSpace = getOrCreateCodedOutputStream().spaceLeft();
if (actualRemainingSpace < minExpectedSpaceAfterObservation || minExpectedSpaceAfterObservation < 0) {
log.warn("Writing a substream (capture type: {}) for Traffic Stream: {} left {} bytes in the CodedOutputStream but we calculated " +
"at least {} bytes remaining, this should be investigated", fieldNumber, connectionIdString + "." + (numFlushesSoFar + 1),
actualRemainingSpace, minExpectedSpaceAfterObservation);
"at least {} bytes remaining, this should be investigated", fieldNumber, connectionIdString + "." + (numFlushesSoFar + 1),
actualRemainingSpace, minExpectedSpaceAfterObservation);
}
}
}
Loading

0 comments on commit f42650a

Please sign in to comment.