Skip to content

Commit

Permalink
Fix StreamChannelConnectionCaptureSerializer edge case around segment…
Browse files Browse the repository at this point in the history
…edObservations at capacity of stream

Signed-off-by: Andre Kurait <[email protected]>
  • Loading branch information
AndreKurait committed May 2, 2024
1 parent e96634a commit 305c5dc
Show file tree
Hide file tree
Showing 4 changed files with 261 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.Timestamp;
import io.netty.buffer.ByteBuf;
import org.opensearch.migrations.trafficcapture.protos.TrafficObservation;
import org.opensearch.migrations.trafficcapture.protos.TrafficStream;

import java.nio.ByteBuffer;
import java.time.Instant;

/**
Expand All @@ -28,18 +28,18 @@ public static int getSizeOfTimestamp(Instant t) {

/**
* This function calculates the maximum bytes that would be needed to store a [Read/Write]SegmentObservation, if constructed
* from the given ByteBuffer and associated segment field numbers and values passed in. This estimate is essentially
* the max size needed in the CodedOutputStream to store the provided ByteBuffer data and its associated TrafficStream
* from the given ByteBuf and associated segment field numbers and values passed in. This estimate is essentially
* the max size needed in the CodedOutputStream to store the provided ByteBuf data and its associated TrafficStream
* overhead. The actual required bytes could be marginally smaller.
*/
public static int maxBytesNeededForASegmentedObservation(Instant timestamp, int observationFieldNumber,
int dataFieldNumber, ByteBuffer buffer) {
int dataFieldNumber, ByteBuf buf) {
// Timestamp required bytes
int tsContentSize = getSizeOfTimestamp(timestamp);
int tsTagAndContentSize = CodedOutputStream.computeInt32Size(TrafficObservation.TS_FIELD_NUMBER, tsContentSize) + tsContentSize;

// Capture required bytes
int dataSize = computeByteBufferRemainingSize(dataFieldNumber, buffer);
int dataSize = computeByteBufRemainingSize(dataFieldNumber, buf);
int captureTagAndContentSize = CodedOutputStream.computeInt32Size(observationFieldNumber, dataSize) + dataSize;

// Observation and closing index required bytes
Expand All @@ -48,21 +48,38 @@ public static int maxBytesNeededForASegmentedObservation(Instant timestamp, int
}

/**
* This function determines the number of bytes needed to write the remaining bytes in a byteBuffer and its tag.
* Use this over CodeOutputStream.computeByteBufferSize(int fieldNumber, ByteBuffer buffer) due to the latter
* relying on the ByteBuffer capacity instead of limit in size calculation.
* This function calculates the maximum bytes that would be needed to store a [Read/Write]SegmentObservation, if constructed
* from the given ByteBuf and associated segment field numbers and values passed in. This estimate is essentially
* the max size needed in the CodedOutputStream to store the provided ByteBuf data and its associated TrafficStream
* overhead. The actual required bytes could be marginally smaller.
*/
public static int bytesNeededForANonSegmentedObservation(Instant timestamp, int observationFieldNumber,
int dataFieldNumber, ByteBuf buf) {
// Timestamp required bytes
int tsContentSize = getSizeOfTimestamp(timestamp);
int tsTagAndContentSize = CodedOutputStream.computeInt32Size(TrafficObservation.TS_FIELD_NUMBER, tsContentSize) + tsContentSize;

// Capture required bytes
int dataSize = computeByteBufRemainingSize(dataFieldNumber, buf);
int captureTagAndContentSize = CodedOutputStream.computeInt32Size(observationFieldNumber, dataSize) + dataSize;

// Observation and closing index required bytes
return bytesNeededForObservationAndClosingIndex(tsTagAndContentSize + captureTagAndContentSize,
Integer.MAX_VALUE);
}

/**
* This function determines the number of bytes needed to write the readable bytes in a byteBuf and its tag.
*/
public static int computeByteBufferRemainingSize(int fieldNumber, ByteBuffer buffer) {
return CodedOutputStream.computeTagSize(fieldNumber) + computeByteBufferRemainingSizeNoTag(buffer);
public static int computeByteBufRemainingSize(int fieldNumber, ByteBuf buffer) {
return CodedOutputStream.computeTagSize(fieldNumber) + computeByteBufRemainingSizeNoTag(buffer);
}

/**
* This function determines the number of bytes needed to write the remaining bytes in a byteBuffer. Use this over
* CodeOutputStream.computeByteBufferSizeNoTag(ByteBuffer buffer) due to the latter relying on the
* ByteBuffer capacity instead of limit in size calculation.
* This function determines the number of bytes needed to write the readable bytes in a byteBuf.
*/
public static int computeByteBufferRemainingSizeNoTag(ByteBuffer buffer) {
int bufferSize = buffer.remaining();
public static int computeByteBufRemainingSizeNoTag(ByteBuf buffer) {
int bufferSize = buffer.readableBytes();
return CodedOutputStream.computeUInt32SizeNoTag(bufferSize) + bufferSize;
}

Expand Down
Loading

0 comments on commit 305c5dc

Please sign in to comment.