Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Andre Kurait <[email protected]>
  • Loading branch information
AndreKurait committed May 3, 2024
1 parent 0325b24 commit 61dbc8f
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,16 @@ public static int maxBytesNeededForASegmentedObservation(Instant timestamp, int
/**
* This function determines the number of bytes needed to write the readable bytes in a byteBuf and its tag.
*/
public static int computeByteBufRemainingSize(int fieldNumber, ByteBuf buffer) {
return CodedOutputStream.computeTagSize(fieldNumber) + computeByteBufRemainingSizeNoTag(buffer);
public static int computeByteBufRemainingSize(int fieldNumber, ByteBuf buf) {
return CodedOutputStream.computeTagSize(fieldNumber) + computeByteBufRemainingSizeNoTag(buf);
}

/**
* This function determines the number of bytes needed to write the readable bytes in a byteBuf.
*/
public static int computeByteBufRemainingSizeNoTag(ByteBuf buffer) {
int bufferSize = buffer.readableBytes();
return CodedOutputStream.computeUInt32SizeNoTag(bufferSize) + bufferSize;
public static int computeByteBufRemainingSizeNoTag(ByteBuf buf) {
int bufSize = buf.readableBytes();
return CodedOutputStream.computeUInt32SizeNoTag(bufSize) + bufSize;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
import com.google.protobuf.WireFormat;
import io.netty.buffer.ByteBuf;

import java.io.UncheckedIOException;
import java.nio.channels.GatheringByteChannel;
import java.util.stream.IntStream;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.opensearch.migrations.trafficcapture.protos.CloseObservation;
Expand Down Expand Up @@ -136,12 +138,24 @@ private CodedOutputStreamHolder getOrCreateCodedOutputStreamHolder() throws IOEx

public int currentOutputStreamWriteableSpaceLeft() throws IOException {
// Writeable bytes is the space left minus the space needed to complete the next flush
var maxFieldNum = Math.max(TrafficStream.NUMBEROFTHISLASTCHUNK_FIELD_NUMBER, TrafficStream.NUMBER_FIELD_NUMBER);
var spaceNeededForNextFlush = CodedOutputStream.computeInt32Size(maxFieldNum, numFlushesSoFar + 1);
var maxFieldTagNumberToBeWrittenUponStreamFlush = Math.max(TrafficStream.NUMBEROFTHISLASTCHUNK_FIELD_NUMBER, TrafficStream.NUMBER_FIELD_NUMBER);
var spaceNeededForRecordCreationDuringNextFlush = CodedOutputStream.computeInt32Size(maxFieldTagNumberToBeWrittenUponStreamFlush, numFlushesSoFar + 1);
var outputStreamSpaceLeft = getOrCreateCodedOutputStreamHolder().getOutputStreamSpaceLeft();
return outputStreamSpaceLeft == -1 ? -1 : outputStreamSpaceLeft - spaceNeededForNextFlush;
return outputStreamSpaceLeft == -1 ? -1 : outputStreamSpaceLeft - spaceNeededForRecordCreationDuringNextFlush;
}

/**
* Checks if the current output stream has enough space for the required size and flushes if not.
* This method evaluates the writable space left in the current stream. If the space is insufficient
* for the required size, it triggers a flush operation by calling {@link #flushCommitAndResetStream(boolean)}
* with 'false' to indicate this is not a final operation. If there is adequate space,
* it returns a completed future with null.
*
* @param requiredSize The size required to write to the current stream.
* @return CompletableFuture<T> A future that completes immediately with null if there is enough space,
* or completes with the future returned by flushCommitAndResetStream if a flush is needed.
* @throws IOException if there are I/O errors when checking the stream's space or flushing.
*/
public CompletableFuture<T> flushIfNeeded(int requiredSize) throws IOException {
var spaceLeft = currentOutputStreamWriteableSpaceLeft();
if (spaceLeft != -1 && spaceLeft < requiredSize) {
Expand Down Expand Up @@ -172,7 +186,7 @@ private void beginSubstreamObservation(Instant timestamp, int captureTagFieldNum
final var captureTagNoLengthSize = CodedOutputStream.computeTagSize(captureTagFieldNumber);
final var observationContentSize = tsTagSize + tsContentSize + captureTagNoLengthSize + captureTagLengthAndContentSize;
// Ensure space is available before starting an observation
flushIfNeeded(CodedOutputStreamSizeUtil.bytesNeededForObservationAndClosingIndex(observationContentSize, numFlushesSoFar + 1)).join();
flushIfNeeded(CodedOutputStreamSizeUtil.bytesNeededForObservationAndClosingIndex(observationContentSize, numFlushesSoFar + 1));
// e.g. 2 {
writeTrafficStreamTag(TrafficStream.SUBSTREAM_FIELD_NUMBER);
// Write observation content length
Expand All @@ -191,41 +205,41 @@ private void writeTimestampForNowToCurrentStream(Instant timestamp) throws IOExc
}
}

// Optimistically calculates maxWriteableSpace taking into account lengthSpace. In some cases, this may
// yield a maxWriteableSpace with one leftover byte, however, this is still the correct max as attempting to write
// the additional byte would yield an additional length byte and overflow
public static int calculateMaxWritableSpace(int totalAvailableSpace, int requestedWriteableSpace) {
// Overestimate the lengthFieldSpace first to then correct in instances where writing one more byte does not result
// in as large a lengthFieldSpace. In instances where we must have a leftoverByte, it will be in the lengthFieldSpace
final int maxLengthFieldSpace = CodedOutputStream.computeUInt32SizeNoTag(totalAvailableSpace);
final int initialEstimatedMaxWriteSpace = totalAvailableSpace - maxLengthFieldSpace;
final int lengthFieldSpaceForInitialEstimatedAndOneMoreByte = CodedOutputStream.computeUInt32SizeNoTag(initialEstimatedMaxWriteSpace + 1);

int maxWriteBytesSpace = totalAvailableSpace - lengthFieldSpaceForInitialEstimatedAndOneMoreByte;

return Math.min(maxWriteBytesSpace, requestedWriteableSpace);
}

// Similar to calculateMaxWritableSpace but perform pessimistic calculation with fewer operations. In some cases returns
// up to 1 byte fewer than what could be written out of the available space.
public static int pessimisticallyCalculateMaxWritableSpace(int totalAvailableSpace, int requestedWriteableSpace) {
/**
* Computes the maximum number of writable bytes for a length-delimited field within a given total available space.
* This method takes into account the space required for encoding the length of the field itself, which might reduce
* the writable space due to the encoding overhead.
*
* @param totalAvailableSpace The total available space in bytes that can be used for both the length field and the data.
* @param requestedFieldSize The desired number of writable bytes the caller wishes to use for the data, excluding the length field.
* @return The maximum number of bytes that can be written as data, which may be less than the requestedWritableSpace due to space
* taken by the length field and totalAvailableSpace. If a length delimited field is written of the returned size, and the returned
* length is less than requestedFieldSize, then there will be at most one byte of available space remaining. In some cases
* this byte is due to overestimation of lengthFieldSpace in which an optimal calculation would have returned one length
* greater, and in other cases it is due to the length returned being at the border of an increase in the lengthFieldSpace
* and thus an additional space on the return value would require two additional space to write.
*/
public static int computeMaxLengthDelimitedFieldSizeForSpace(int totalAvailableSpace, int requestedFieldSize) {
// A pessimistic calculation space required for the length field due to not accounting for the space of the length field itself.
// This may yield a length space one byte greater than optimal, potentially leaving at most one length delimited byte
// which the availableSpace has space for.
final int pessimisticLengthFieldSpace = CodedOutputStream.computeUInt32SizeNoTag(totalAvailableSpace);
int maxWriteBytesSpace = totalAvailableSpace - pessimisticLengthFieldSpace;
return Math.min(maxWriteBytesSpace, requestedWriteableSpace);
return Math.min(maxWriteBytesSpace, requestedFieldSize);
}

private void readByteBufIntoCurrentStream(int fieldNum, ByteBuf buf) throws IOException {
var codedOutputStream = getOrCreateCodedOutputStream();
if (buf.readableBytes() > 0) {
final int bufReadableLength = buf.readableBytes();
if (bufReadableLength > 0) {
// Here we are optimizing to reduce the number of internal copies and merges performed on the netty
// ByteBuf to write to the CodedOutputStream especially in cases of Composite and Direct ByteBufs. We
// can do this by delegating the individual ByteBuffer writes to netty which will retain the underlying
// structure. We also need to be careful with the exact CodedOutputStream operations performed to ensure
// we write until hitting ByteBuf/ByteBuffer writerIndex/limit and not their capacity since some
// CodedOutputStream operations write until capacity, e.g. CodedOutputStream::writeByteBuffer
codedOutputStream.writeTag(fieldNum, WireFormat.WIRETYPE_LENGTH_DELIMITED);
codedOutputStream.writeUInt32NoTag(buf.readableBytes());
buf.readBytes(new ByteOutputGatheringByteChannel(codedOutputStream), buf.readableBytes());
codedOutputStream.writeUInt32NoTag(bufReadableLength);
buf.readBytes(new ByteOutputGatheringByteChannel(codedOutputStream), bufReadableLength);
assert buf.readableBytes() == 0 : "Expected buf bytes read but instead left " + buf.readableBytes() + " unread.";
} else {
codedOutputStream.writeUInt32NoTag(0);
Expand All @@ -241,6 +255,17 @@ private void writeByteStringToCurrentStream(int fieldNum, String str) throws IOE
}
}

/**
* Writes a record to the stream, flushes it, and begins its closure. This method synchronously sets up
* the closing process of the underlying stream and prepares the CodedOutputStreamHolder to return a new stream on next retrieval.
* Each invocation writes a record to signal the current state: the final chunk if 'isFinal' is true,
* otherwise a continuation. Returns a CompletableFuture that resolves upon the stream's closure.
*
* @param isFinal Indicates if this should be the final operation on the stream.
* @return CompletableFuture<T> A future that completes when the stream is closed. Returns null if already closed or no stream exists and 'isFinal' is false.
* @throws IOException if there are I/O errors during the operation.
*/

@Override
public CompletableFuture<T> flushCommitAndResetStream(boolean isFinal) throws IOException {
if (streamHasBeenClosed || (currentCodedOutputStreamHolderOrNull == null && !isFinal)) {
Expand Down Expand Up @@ -342,26 +367,26 @@ private void addDataMessage(int captureFieldNumber, int dataFieldNumber, Instant
// Writing one data byte requires two bytes to account for length byte
final int maxBytesNeededForOneSegmentWithOneDataByteWithLengthByte = trafficStreamOverhead + 2;

flushIfNeeded(maxBytesNeededForOneSegmentWithOneDataByteWithLengthByte).join();
flushIfNeeded(maxBytesNeededForOneSegmentWithOneDataByteWithLengthByte);
var spaceLeft = currentOutputStreamWriteableSpaceLeft();

var bufToRead = buf.duplicate();
// If our message is empty or can fit in the current CodedOutputStream no chunking is needed, and we can continue
if (buf.readableBytes() == 0 || spaceLeft == -1 || messageAndOverheadBytesLeft <= spaceLeft) {
if (bufToRead.readableBytes() == 0 || spaceLeft == -1 || messageAndOverheadBytesLeft <= spaceLeft) {
int minExpectedSpaceAfterObservation = spaceLeft - messageAndOverheadBytesLeft;
addSubstreamMessage(captureFieldNumber, dataFieldNumber, timestamp, buf.duplicate());
addSubstreamMessage(captureFieldNumber, dataFieldNumber, timestamp, bufToRead);
observationSizeSanityCheck(minExpectedSpaceAfterObservation, captureFieldNumber);
} else {
var readBuffer = buf.duplicate();
while(readBuffer.readableBytes() > 0) {
// addSubstreamMessage will write until COS limit and flush prior if needed
while(bufToRead.readableBytes() > 0) {
spaceLeft = currentOutputStreamWriteableSpaceLeft();
var bytesToRead = pessimisticallyCalculateMaxWritableSpace(spaceLeft - trafficStreamOverhead, readBuffer.readableBytes());
var bytesToRead = computeMaxLengthDelimitedFieldSizeForSpace(spaceLeft - trafficStreamOverhead, bufToRead.readableBytes());
if (bytesToRead <= 0) {
throw new IllegalStateException("Stream space is not allowing forward progress on byteBuf reading");
}
var bufSliceToRead = readBuffer.readSlice(bytesToRead);
var bufSliceToRead = bufToRead.readSlice(bytesToRead);
addSubstreamMessage(segmentFieldNumber, segmentDataFieldNumber, timestamp, bufSliceToRead);
if (readBuffer.readableBytes() > 0) {
flushIfNeeded(maxBytesNeededForOneSegmentWithOneDataByteWithLengthByte).join();
if (bufToRead.readableBytes() > 0) {
flushIfNeeded(maxBytesNeededForOneSegmentWithOneDataByteWithLengthByte);
}
}
writeEndOfSegmentMessage(timestamp);
Expand Down Expand Up @@ -530,11 +555,16 @@ public int write(ByteBuffer src) throws IOException {
}

@Override
public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
for (int i = offset; i < offset + length; i++) {
write(srcs[i]);
}
return length;
public long write(ByteBuffer[] srcs, int offset, int length) throws UncheckedIOException {
return IntStream.range(offset, offset + length)
.mapToLong(i -> {

Check warning on line 560 in TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java#L559-L560

Added lines #L559 - L560 were not covered by tests
try {
return write(srcs[i]);
} catch (IOException e) {
throw new UncheckedIOException(e);

Check warning on line 564 in TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java#L562-L564

Added lines #L562 - L564 were not covered by tests
}
})
.sum();

Check warning on line 567 in TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java#L567

Added line #L567 was not covered by tests
}

@Override
Expand All @@ -544,12 +574,11 @@ public long write(ByteBuffer[] srcs) throws IOException {

@Override
public boolean isOpen() {
throw new UnsupportedOperationException();
return true;

Check warning on line 577 in TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java#L577

Added line #L577 was not covered by tests
}

@Override
public void close() {
throw new UnsupportedOperationException();
}

Check warning on line 582 in TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java#L582

Added line #L582 was not covered by tests
}
}
Loading

0 comments on commit 61dbc8f

Please sign in to comment.