Skip to content

Commit

Permalink
Add StreamChannelConnectionCaptureSerializerTest for buffer with fewe…
Browse files Browse the repository at this point in the history
…r bytes written than capacity

Signed-off-by: Andre Kurait <[email protected]>
  • Loading branch information
AndreKurait committed Apr 30, 2024
1 parent 0ee07e3 commit ef939d6
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import io.netty.buffer.ByteBuf;

import java.util.function.IntSupplier;
import java.util.function.Supplier;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.opensearch.migrations.trafficcapture.protos.CloseObservation;
Expand Down Expand Up @@ -276,8 +275,12 @@ private void addStringMessage(int captureFieldNumber, int dataFieldNumber,
writeByteStringToCurrentStream(dataFieldNumber, str);
}

private void addDataMessage(int captureFieldNumber, int dataFieldNumber, Instant timestamp, ByteBuf buffer) throws IOException {
var byteBuffer = buffer.nioBuffer();
void addDataMessage(int captureFieldNumber, int dataFieldNumber, Instant timestamp, ByteBuf buf) throws IOException {
addDataMessage(captureFieldNumber, dataFieldNumber, timestamp, buf.nioBuffer());
}

void addDataMessage(int captureFieldNumber, int dataFieldNumber, Instant timestamp, ByteBuffer nioBuffer) throws IOException {
var readOnlyDataBuffer = nioBuffer.asReadOnlyBuffer();
int segmentFieldNumber;
int segmentDataFieldNumber;
if (captureFieldNumber == TrafficObservation.READ_FIELD_NUMBER) {
Expand All @@ -294,43 +297,42 @@ private void addDataMessage(int captureFieldNumber, int dataFieldNumber, Instant
// when considering the case of a message that does not need segments or for the case of a smaller segment created
// from a much larger message
int messageAndOverheadBytesLeft = CodedOutputStreamSizeUtil.maxBytesNeededForASegmentedObservation(timestamp,
segmentFieldNumber, segmentDataFieldNumber, byteBuffer);
int trafficStreamOverhead = messageAndOverheadBytesLeft - byteBuffer.capacity();
segmentFieldNumber, segmentDataFieldNumber, readOnlyDataBuffer);
int trafficStreamOverhead = messageAndOverheadBytesLeft - readOnlyDataBuffer.capacity();

// Ensure that space for at least one data byte and overhead exists, otherwise a flush is necessary.
flushIfNeeded(() -> (trafficStreamOverhead + 1));

// If our message is empty or can fit in the current CodedOutputStream no chunking is needed, and we can continue
var spaceLeft = getOrCreateCodedOutputStreamHolder().getOutputStreamSpaceLeft();
if (byteBuffer.limit() == 0 || spaceLeft == -1 || messageAndOverheadBytesLeft <= spaceLeft) {
if (readOnlyDataBuffer.limit() == 0 || spaceLeft == -1 || messageAndOverheadBytesLeft <= spaceLeft) {
int minExpectedSpaceAfterObservation = spaceLeft - messageAndOverheadBytesLeft;
addSubstreamMessage(captureFieldNumber, dataFieldNumber, timestamp, byteBuffer);
addSubstreamMessage(captureFieldNumber, dataFieldNumber, timestamp, readOnlyDataBuffer);
observationSizeSanityCheck(minExpectedSpaceAfterObservation, captureFieldNumber);
return;
}

while(byteBuffer.position() < byteBuffer.limit()) {
while(readOnlyDataBuffer.position() < readOnlyDataBuffer.limit()) {
// COS checked for unbounded limit above
int availableCOSSpace = getOrCreateCodedOutputStreamHolder().getOutputStreamSpaceLeft();
int chunkBytes = messageAndOverheadBytesLeft > availableCOSSpace ? availableCOSSpace - trafficStreamOverhead : byteBuffer.limit() - byteBuffer.position();
ByteBuffer bb = byteBuffer.duplicate();
int chunkBytes = messageAndOverheadBytesLeft > availableCOSSpace ? availableCOSSpace - trafficStreamOverhead : readOnlyDataBuffer.limit() - readOnlyDataBuffer.position();
ByteBuffer bb = readOnlyDataBuffer.slice();
bb.limit(chunkBytes);
bb = bb.duplicate();
byteBuffer.position(byteBuffer.position() + chunkBytes);
bb = bb.slice();
readOnlyDataBuffer.position(readOnlyDataBuffer.position() + chunkBytes);
addSubstreamMessage(segmentFieldNumber, segmentDataFieldNumber, timestamp, bb);
int minExpectedSpaceAfterObservation = availableCOSSpace - chunkBytes - trafficStreamOverhead;
observationSizeSanityCheck(minExpectedSpaceAfterObservation, segmentFieldNumber);
// 1 to N-1 chunked messages
if (byteBuffer.position() < byteBuffer.limit()) {
if (readOnlyDataBuffer.position() < readOnlyDataBuffer.limit()) {
flushCommitAndResetStream(false);
messageAndOverheadBytesLeft = messageAndOverheadBytesLeft - chunkBytes;
}
}
writeEndOfSegmentMessage(timestamp);

}

private void addSubstreamMessage(int captureFieldNumber, int dataFieldNumber, int dataCountFieldNumber, int dataCount,
void addSubstreamMessage(int captureFieldNumber, int dataFieldNumber, int dataCountFieldNumber, int dataCount,
Instant timestamp, java.nio.ByteBuffer byteBuffer) throws IOException {
int dataSize = 0;
int segmentCountSize = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,25 @@ public void testEmptyPacketIsHandledForSmallCodedOutputStream()
Assertions.assertEquals(0, reconstitutedTrafficStream.getSubStream(1).getWrite().getData().size());
}

@Test
public void testWriteIsHandledForBufferAllocatedLargerThanWritten()
throws IOException, ExecutionException, InterruptedException {
var outputBuffersCreated = new ConcurrentLinkedQueue<ByteBuffer>();
var serializer = createSerializerWithTestHandler(outputBuffersCreated, getEstimatedTrafficStreamByteSize(1, 200));

ByteBuffer byteBuffer = ByteBuffer.allocateDirect(100);
byteBuffer.limit(50);
byteBuffer.putInt(1);

serializer.addDataMessage(TrafficObservation.WRITE_FIELD_NUMBER, WriteObservation.DATA_FIELD_NUMBER, REFERENCE_TIMESTAMP, byteBuffer);
var future = serializer.flushCommitAndResetStream(true);
future.get();

var outputBuffersList = new ArrayList<>(outputBuffersCreated);
TrafficStream reconstitutedTrafficStream = TrafficStream.parseFrom(outputBuffersList.get(0));
Assertions.assertEquals(1, reconstitutedTrafficStream.getSubStream(0).getWrite().getData().size());
}

@Test
public void testWithLimitlessCodedOutputStreamHolder()
throws IOException, ExecutionException, InterruptedException {
Expand Down

0 comments on commit ef939d6

Please sign in to comment.