Skip to content

Commit

Permalink
Add StreamChannelConnectionCaptureSerializerTests around segmentedObs…
Browse files Browse the repository at this point in the history
…ervations at capacity of stream

Signed-off-by: Andre Kurait <[email protected]>
  • Loading branch information
AndreKurait committed May 2, 2024
1 parent 0d75045 commit e96634a
Showing 1 changed file with 116 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.opensearch.migrations.trafficcapture.StreamChannelConnectionCaptureSerializerTest.StreamManager.NullStreamManager;
import org.opensearch.migrations.trafficcapture.protos.CloseObservation;
import org.opensearch.migrations.trafficcapture.protos.ConnectionExceptionObservation;
Expand All @@ -33,6 +36,7 @@
import org.opensearch.migrations.trafficcapture.protos.TrafficObservation;
import org.opensearch.migrations.trafficcapture.protos.TrafficStream;
import org.opensearch.migrations.trafficcapture.protos.WriteObservation;
import org.opensearch.migrations.trafficcapture.protos.WriteSegmentObservation;

@Slf4j
class StreamChannelConnectionCaptureSerializerTest {
Expand Down Expand Up @@ -150,6 +154,74 @@ public void testBasicDataConsistencyWhenChunking() throws IOException, Execution
Assertions.assertEquals(packetData, reconstructedData.toString());
}

@Test
public void testWriteObservationWithSpaceForOneByteAtATime() throws IOException, ExecutionException, InterruptedException {
var packetData = FAKE_READ_PACKET_DATA.repeat(5);
byte[] packetBytes = packetData.getBytes(StandardCharsets.UTF_8);
var numberOfChunks = packetBytes.length;

var outputBuffersCreated = new ConcurrentLinkedQueue<ByteBuffer>();

var streamOverheadBytes = CodedOutputStream.computeStringSize(TrafficStream.CONNECTIONID_FIELD_NUMBER, TEST_TRAFFIC_STREAM_ID_STRING) +
CodedOutputStream.computeStringSize(TrafficStream.NODEID_FIELD_NUMBER, TEST_NODE_ID_STRING);
var spaceNeededForFlush = CodedOutputStream.computeInt32Size(TrafficStream.NUMBEROFTHISLASTCHUNK_FIELD_NUMBER, packetData.length());

var bufferSizeToGetDesiredChunks = CodedOutputStreamSizeUtil.maxBytesNeededForASegmentedObservation(REFERENCE_TIMESTAMP,
TrafficObservation.WRITESEGMENT_FIELD_NUMBER,
WriteSegmentObservation.DATA_FIELD_NUMBER,
Unpooled.wrappedBuffer(packetBytes,0,packetBytes.length / numberOfChunks)) + streamOverheadBytes + spaceNeededForFlush;

assert CodedOutputStreamSizeUtil
.computeByteBufRemainingSizeNoTag(Unpooled.wrappedBuffer(packetBytes,0,packetBytes.length / numberOfChunks)) == 2;

var serializer = createSerializerWithTestHandler(outputBuffersCreated, bufferSizeToGetDesiredChunks);

serializer.addWriteEvent(REFERENCE_TIMESTAMP, Unpooled.wrappedBuffer(packetBytes));

var future = serializer.flushCommitAndResetStream(true);
future.get();

List<TrafficObservation> observations = new ArrayList<>();
for (ByteBuffer buffer : outputBuffersCreated) {
var trafficStream = TrafficStream.parseFrom(buffer);
observations.add(trafficStream.getSubStream(0));
}

StringBuilder reconstructedData = new StringBuilder();
for (TrafficObservation observation : observations) {
var stringChunk = observation.getWriteSegment().getData().toStringUtf8();
reconstructedData.append(stringChunk);
}
Assertions.assertEquals(packetData, reconstructedData.toString());
// Expect extra observation for EndOfSegmentMessage when chunked
Assertions.assertEquals(numberOfChunks + 1, observations.size());
}

@Test
public void testExceptionWhenStreamTooSmallForObservation() throws IOException, ExecutionException, InterruptedException {
byte[] packetBytes = new byte[1];
var outputBuffersCreated = new ConcurrentLinkedQueue<ByteBuffer>();

var streamOverheadBytes = CodedOutputStream.computeStringSize(TrafficStream.CONNECTIONID_FIELD_NUMBER, TEST_TRAFFIC_STREAM_ID_STRING) +
CodedOutputStream.computeStringSize(TrafficStream.NODEID_FIELD_NUMBER, TEST_NODE_ID_STRING);
var spaceNeededForFlush = CodedOutputStream.computeInt32Size(TrafficStream.NUMBEROFTHISLASTCHUNK_FIELD_NUMBER, 1);

var bufferSizeWithoutSpaceForBytes = CodedOutputStreamSizeUtil.maxBytesNeededForASegmentedObservation(REFERENCE_TIMESTAMP,
TrafficObservation.WRITESEGMENT_FIELD_NUMBER,
WriteSegmentObservation.DATA_FIELD_NUMBER,
Unpooled.buffer()) + streamOverheadBytes + spaceNeededForFlush;

var serializer = createSerializerWithTestHandler(outputBuffersCreated, bufferSizeWithoutSpaceForBytes);

Assertions.assertThrows(IllegalStateException.class, () -> {
serializer.addWriteEvent(REFERENCE_TIMESTAMP, Unpooled.wrappedBuffer(packetBytes));
});

var future = serializer.flushCommitAndResetStream(true);
future.get();
}


@Test
public void testCloseObservationAfterWriteWillFlushWhenSpaceNeeded()
throws IOException, ExecutionException, InterruptedException {
Expand Down Expand Up @@ -197,47 +269,33 @@ public void testEmptyPacketIsHandledForSmallCodedOutputStream()
Assertions.assertEquals(0, reconstitutedTrafficStream.getSubStream(1).getWrite().getData().size());
}

@Test
public void testWriteIsHandledForBufferAllocatedLargerThanWritten()
@ParameterizedTest
@ValueSource(ints = {1, 16})
public void testWriteIsHandledForBufferWithNioBufAllocatedLargerThanWritten(int numberOfChunks)
throws IOException, ExecutionException, InterruptedException {
// Use ByteBuf that returns nioBuffer with limit < capacity this can cause some edge cases to trigger depending
// on specific CodedOutputStream apis used
var outputBuffersCreated = new ConcurrentLinkedQueue<ByteBuffer>();
var serializer = createSerializerWithTestHandler(outputBuffersCreated, getEstimatedTrafficStreamByteSize(1, 200));

ByteBuffer byteBuffer = ByteBuffer.allocateDirect(100);
byteBuffer.put(FAKE_READ_PACKET_DATA.getBytes(StandardCharsets.UTF_8));
byteBuffer.flip();
var dataBytes = FAKE_READ_PACKET_DATA.getBytes(StandardCharsets.UTF_8);

serializer.addDataMessage(TrafficObservation.WRITE_FIELD_NUMBER, WriteObservation.DATA_FIELD_NUMBER, REFERENCE_TIMESTAMP, byteBuffer);
var future = serializer.flushCommitAndResetStream(true);
future.get();
Assertions.assertTrue(dataBytes.length >= numberOfChunks);

Assertions.assertEquals(0, byteBuffer.position());
var streamOverheadBytes = CodedOutputStream.computeStringSize(TrafficStream.CONNECTIONID_FIELD_NUMBER, TEST_TRAFFIC_STREAM_ID_STRING) +
CodedOutputStream.computeStringSize(TrafficStream.NODEID_FIELD_NUMBER, TEST_NODE_ID_STRING) + 2;

var outputBuffersList = new ArrayList<>(outputBuffersCreated);
TrafficStream reconstitutedTrafficStream = TrafficStream.parseFrom(outputBuffersList.get(0));
Assertions.assertEquals(FAKE_READ_PACKET_DATA, reconstitutedTrafficStream.getSubStream(0).getWrite().getData().toStringUtf8());
}
var bufferSizeToGetDesiredChunks = CodedOutputStreamSizeUtil.maxBytesNeededForASegmentedObservation(REFERENCE_TIMESTAMP, 1,2,
Unpooled.wrappedBuffer(dataBytes,0,dataBytes.length / numberOfChunks)) + streamOverheadBytes;

@Test
public void testWriteIsHandledForBufferAllocatedLargerThanWrittenWithChunking()
throws IOException, ExecutionException, InterruptedException {
var outputBuffersCreated = new ConcurrentLinkedQueue<ByteBuffer>();
var serializer = createSerializerWithTestHandler(outputBuffersCreated, getEstimatedTrafficStreamByteSize(1, 4));
var serializer = createSerializerWithTestHandler(outputBuffersCreated, bufferSizeToGetDesiredChunks);

ByteBuffer byteBuffer = ByteBuffer.allocate(100);
byteBuffer.put(FAKE_READ_PACKET_DATA.getBytes(StandardCharsets.UTF_8));
byteBuffer.flip();
var readOnlyBuf = createReadOnlyByteBufWithNioByteBufferWithCapacityLargerThanLimit(dataBytes);

Assertions.assertEquals(0, byteBuffer.position());
Assertions.assertEquals(100, byteBuffer.capacity());
Assertions.assertEquals(16, byteBuffer.limit());
serializer.addWriteEvent(REFERENCE_TIMESTAMP, readOnlyBuf);

serializer.addDataMessage(TrafficObservation.WRITE_FIELD_NUMBER, WriteObservation.DATA_FIELD_NUMBER, REFERENCE_TIMESTAMP, byteBuffer);
var future = serializer.flushCommitAndResetStream(true);
future.get();

Assertions.assertEquals(0, byteBuffer.position());

List<TrafficObservation> observations = new ArrayList<>();
for (ByteBuffer buffer : outputBuffersCreated) {
var trafficStream = TrafficStream.parseFrom(buffer);
Expand All @@ -246,12 +304,41 @@ public void testWriteIsHandledForBufferAllocatedLargerThanWrittenWithChunking()

StringBuilder reconstructedData = new StringBuilder();
for (TrafficObservation observation : observations) {
var stringChunk = observation.getWriteSegment().getData().toStringUtf8();
var stringChunk = ((numberOfChunks == 1) ? observation.getWrite().getData()
: observation.getWriteSegment().getData()).toStringUtf8();
reconstructedData.append(stringChunk);
}
Assertions.assertEquals(FAKE_READ_PACKET_DATA, reconstructedData.toString());
// Expect extra observation for EndOfSegmentMessage when chunked
var expectedObservations = numberOfChunks > 1 ? numberOfChunks + 1 : numberOfChunks;
Assertions.assertEquals(expectedObservations, observations.size());
Assertions.assertEquals(0, readOnlyBuf.readerIndex());
}

public static io.netty.buffer.ByteBuf createReadOnlyByteBufWithNioByteBufferWithCapacityLargerThanLimit(byte[] dataBytes) {
// Force creation of a ByteBuf that returns nioBuffer with limit < capacity.
// This can cause some edge cases to trigger during interaction with underlying ByteBuffers
final int spaceBetweenLimitAndCapacity = 100;
final ByteBuffer byteBuffer = ByteBuffer.allocateDirect(dataBytes.length + spaceBetweenLimitAndCapacity);
byteBuffer.put(dataBytes);
byteBuffer.position(dataBytes.length + spaceBetweenLimitAndCapacity);
byteBuffer.flip();

io.netty.buffer.ByteBuf buf = Unpooled.wrappedBuffer(byteBuffer.asReadOnlyBuffer());
buf.writerIndex(buf.writerIndex() - spaceBetweenLimitAndCapacity);

// Double check ByteBuf behaves as expected
var nioBuf = buf.nioBuffer();
Assertions.assertEquals(0, nioBuf.position());
Assertions.assertEquals(dataBytes.length + spaceBetweenLimitAndCapacity, nioBuf.capacity());
Assertions.assertEquals(dataBytes.length, nioBuf.limit());

Assertions.assertEquals(dataBytes.length, buf.readableBytes());
Assertions.assertEquals(0, buf.readerIndex());
Assertions.assertEquals(dataBytes.length + spaceBetweenLimitAndCapacity, buf.capacity());

return buf;
}

@Test
public void testWithLimitlessCodedOutputStreamHolder()
Expand Down

0 comments on commit e96634a

Please sign in to comment.