Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Limitless CodedOutputStreams #543

Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Address PR Comments
Signed-off-by: Andre Kurait <[email protected]>
AndreKurait committed Mar 29, 2024

Verified

This commit was signed with the committer’s verified signature.
AndreKurait Andre Kurait
commit 6e490f7c84b97701bd68c5bb122ea645209b770e
Original file line number Diff line number Diff line change
@@ -92,6 +92,10 @@ private static int getWireTypeForFieldIndex(Descriptors.Descriptor d, int fieldN
return d.findFieldByNumber(fieldNumber).getLiteType().getWireType();
}

private CodedOutputStream getOrCreateCodedOutputStream() throws IOException {
return getOrCreateCodedOutputStreamHolder().getOutputStream();
}

private CodedOutputStreamHolder getOrCreateCodedOutputStreamHolder() throws IOException {
if (streamHasBeenClosed) {
// In an abundance of caution, flip the state back to basically act like a whole new
@@ -137,12 +141,12 @@ public CompletableFuture<T> flushIfNeeded(Supplier<Integer> requiredSize) throws


private void writeTrafficStreamTag(int fieldNumber) throws IOException {
getOrCreateCodedOutputStreamHolder().getOutputStream().writeTag(fieldNumber,
getOrCreateCodedOutputStream().writeTag(fieldNumber,
getWireTypeForFieldIndex(TrafficStream.getDescriptor(), fieldNumber));
}

private void writeObservationTag(int fieldNumber) throws IOException {
getOrCreateCodedOutputStreamHolder().getOutputStream().writeTag(fieldNumber,
getOrCreateCodedOutputStream().writeTag(fieldNumber,
getWireTypeForFieldIndex(TrafficObservation.getDescriptor(), fieldNumber));
}

@@ -161,35 +165,35 @@ private void beginSubstreamObservation(Instant timestamp, int captureTagFieldNum
// e.g. 2 {
writeTrafficStreamTag(TrafficStream.SUBSTREAM_FIELD_NUMBER);
// Write observation content length
getOrCreateCodedOutputStreamHolder().getOutputStream().writeUInt32NoTag(observationContentSize);
getOrCreateCodedOutputStream().writeUInt32NoTag(observationContentSize);
// e.g. 1 { 1: 1234 2: 1234 }
writeTimestampForNowToCurrentStream(timestamp);
}

private void writeTimestampForNowToCurrentStream(Instant timestamp) throws IOException {
writeObservationTag(TrafficObservation.TS_FIELD_NUMBER);
getOrCreateCodedOutputStreamHolder().getOutputStream().writeUInt32NoTag(CodedOutputStreamSizeUtil.getSizeOfTimestamp(timestamp));
getOrCreateCodedOutputStream().writeUInt32NoTag(CodedOutputStreamSizeUtil.getSizeOfTimestamp(timestamp));

getOrCreateCodedOutputStreamHolder().getOutputStream().writeInt64(Timestamp.SECONDS_FIELD_NUMBER, timestamp.getEpochSecond());
getOrCreateCodedOutputStream().writeInt64(Timestamp.SECONDS_FIELD_NUMBER, timestamp.getEpochSecond());
if (timestamp.getNano() != 0) {
getOrCreateCodedOutputStreamHolder().getOutputStream().writeInt32(Timestamp.NANOS_FIELD_NUMBER, timestamp.getNano());
getOrCreateCodedOutputStream().writeInt32(Timestamp.NANOS_FIELD_NUMBER, timestamp.getNano());
}
}

private void writeByteBufferToCurrentStream(int fieldNum, ByteBuffer byteBuffer) throws IOException {
if (byteBuffer.remaining() > 0) {
getOrCreateCodedOutputStreamHolder().getOutputStream().writeByteBuffer(fieldNum, byteBuffer);
getOrCreateCodedOutputStream().writeByteBuffer(fieldNum, byteBuffer);
} else {
getOrCreateCodedOutputStreamHolder().getOutputStream().writeUInt32NoTag(0);
getOrCreateCodedOutputStream().writeUInt32NoTag(0);
}
}


private void writeByteStringToCurrentStream(int fieldNum, String str) throws IOException {
if (str.length() > 0) {
getOrCreateCodedOutputStreamHolder().getOutputStream().writeString(fieldNum, str);
getOrCreateCodedOutputStream().writeString(fieldNum, str);
} else {
getOrCreateCodedOutputStreamHolder().getOutputStream().writeUInt32NoTag(0);
getOrCreateCodedOutputStream().writeUInt32NoTag(0);
}
}

@@ -199,7 +203,7 @@ public CompletableFuture<T> flushCommitAndResetStream(boolean isFinal) throws IO
return CompletableFuture.completedFuture(null);
}
try {
CodedOutputStream currentStream = getOrCreateCodedOutputStreamHolder().getOutputStream();
CodedOutputStream currentStream = getOrCreateCodedOutputStream();
var fieldNum = isFinal ? TrafficStream.NUMBEROFTHISLASTCHUNK_FIELD_NUMBER : TrafficStream.NUMBER_FIELD_NUMBER;
// e.g. 3: 1
currentStream.writeInt32(fieldNum, ++numFlushesSoFar);
@@ -219,7 +223,7 @@ public CompletableFuture<T> flushCommitAndResetStream(boolean isFinal) throws IO
@Override
public void cancelCaptureForCurrentRequest(Instant timestamp) throws IOException {
beginSubstreamObservation(timestamp, TrafficObservation.REQUESTDROPPED_FIELD_NUMBER, 1);
getOrCreateCodedOutputStreamHolder().getOutputStream().writeMessage(TrafficObservation.REQUESTDROPPED_FIELD_NUMBER,
getOrCreateCodedOutputStream().writeMessage(TrafficObservation.REQUESTDROPPED_FIELD_NUMBER,
RequestIntentionallyDropped.getDefaultInstance());
this.readObservationsAreWaitingForEom = false;
this.firstLineByteLength = -1;
@@ -244,7 +248,7 @@ public void addDisconnectEvent(Instant timestamp) throws IOException {
@Override
public void addCloseEvent(Instant timestamp) throws IOException {
beginSubstreamObservation(timestamp, TrafficObservation.CLOSE_FIELD_NUMBER, 1);
getOrCreateCodedOutputStreamHolder().getOutputStream().writeMessage(TrafficObservation.CLOSE_FIELD_NUMBER,
getOrCreateCodedOutputStream().writeMessage(TrafficObservation.CLOSE_FIELD_NUMBER,
CloseObservation.getDefaultInstance());
}

@@ -265,7 +269,7 @@ private void addStringMessage(int captureFieldNumber, int dataFieldNumber,
// e.g. 4 {
writeObservationTag(captureFieldNumber);
if (dataSize > 0) {
getOrCreateCodedOutputStreamHolder().getOutputStream().writeInt32NoTag(dataSize);
getOrCreateCodedOutputStream().writeInt32NoTag(dataSize);
}
writeByteStringToCurrentStream(dataFieldNumber, str);
}
@@ -292,7 +296,7 @@ private void addDataMessage(int captureFieldNumber, int dataFieldNumber, Instant
int trafficStreamOverhead = messageAndOverheadBytesLeft - byteBuffer.capacity();

// Ensure that space for at least one data byte and overhead exists, otherwise a flush is necessary.
flushIfNeeded(() -> trafficStreamOverhead);
flushIfNeeded(() -> (trafficStreamOverhead + 1));

// If our message is empty or can fit in the current CodedOutputStream no chunking is needed, and we can continue
var spaceLeft = getOrCreateCodedOutputStreamHolder().getOutputStreamSpaceLeft();
@@ -329,7 +333,7 @@ private void addSubstreamMessage(int captureFieldNumber, int dataFieldNumber, in
int dataSize = 0;
int segmentCountSize = 0;
int captureClosureLength = 1;
CodedOutputStream codedOutputStream = getOrCreateCodedOutputStreamHolder().getOutputStream();
CodedOutputStream codedOutputStream = getOrCreateCodedOutputStream();
if (dataCountFieldNumber > 0) {
segmentCountSize = CodedOutputStream.computeInt32Size(dataCountFieldNumber, dataCount);
}
@@ -450,14 +454,14 @@ private void writeEndOfHttpMessage(Instant timestamp) throws IOException {
beginSubstreamObservation(timestamp, TrafficObservation.ENDOFMESSAGEINDICATOR_FIELD_NUMBER, eomDataSize);
// e.g. 15 {
writeObservationTag(TrafficObservation.ENDOFMESSAGEINDICATOR_FIELD_NUMBER);
getOrCreateCodedOutputStreamHolder().getOutputStream().writeUInt32NoTag(eomPairSize);
getOrCreateCodedOutputStreamHolder().getOutputStream().writeInt32(EndOfMessageIndication.FIRSTLINEBYTELENGTH_FIELD_NUMBER, firstLineByteLength);
getOrCreateCodedOutputStreamHolder().getOutputStream().writeInt32(EndOfMessageIndication.HEADERSBYTELENGTH_FIELD_NUMBER, headersByteLength);
getOrCreateCodedOutputStream().writeUInt32NoTag(eomPairSize);
getOrCreateCodedOutputStream().writeInt32(EndOfMessageIndication.FIRSTLINEBYTELENGTH_FIELD_NUMBER, firstLineByteLength);
getOrCreateCodedOutputStream().writeInt32(EndOfMessageIndication.HEADERSBYTELENGTH_FIELD_NUMBER, headersByteLength);
}

private void writeEndOfSegmentMessage(Instant timestamp) throws IOException {
beginSubstreamObservation(timestamp, TrafficObservation.SEGMENTEND_FIELD_NUMBER, 1);
getOrCreateCodedOutputStreamHolder().getOutputStream().writeMessage(TrafficObservation.SEGMENTEND_FIELD_NUMBER, EndOfSegmentsIndication.getDefaultInstance());
getOrCreateCodedOutputStream().writeMessage(TrafficObservation.SEGMENTEND_FIELD_NUMBER, EndOfSegmentsIndication.getDefaultInstance());
}

private void observationSizeSanityCheck(int minExpectedSpaceAfterObservation, int fieldNumber) throws IOException {