Skip to content

Commit

Permalink
Address PR Comments
Browse files Browse the repository at this point in the history
Signed-off-by: Andre Kurait <[email protected]>
  • Loading branch information
AndreKurait committed Mar 29, 2024
1 parent 0473569 commit 6e490f7
Showing 1 changed file with 24 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ private static int getWireTypeForFieldIndex(Descriptors.Descriptor d, int fieldN
return d.findFieldByNumber(fieldNumber).getLiteType().getWireType();
}

private CodedOutputStream getOrCreateCodedOutputStream() throws IOException {
return getOrCreateCodedOutputStreamHolder().getOutputStream();
}

private CodedOutputStreamHolder getOrCreateCodedOutputStreamHolder() throws IOException {
if (streamHasBeenClosed) {
// In an abundance of caution, flip the state back to basically act like a whole new
Expand Down Expand Up @@ -137,12 +141,12 @@ public CompletableFuture<T> flushIfNeeded(Supplier<Integer> requiredSize) throws


private void writeTrafficStreamTag(int fieldNumber) throws IOException {
getOrCreateCodedOutputStreamHolder().getOutputStream().writeTag(fieldNumber,
getOrCreateCodedOutputStream().writeTag(fieldNumber,
getWireTypeForFieldIndex(TrafficStream.getDescriptor(), fieldNumber));
}

private void writeObservationTag(int fieldNumber) throws IOException {
getOrCreateCodedOutputStreamHolder().getOutputStream().writeTag(fieldNumber,
getOrCreateCodedOutputStream().writeTag(fieldNumber,
getWireTypeForFieldIndex(TrafficObservation.getDescriptor(), fieldNumber));
}

Expand All @@ -161,35 +165,35 @@ private void beginSubstreamObservation(Instant timestamp, int captureTagFieldNum
// e.g. 2 {
writeTrafficStreamTag(TrafficStream.SUBSTREAM_FIELD_NUMBER);
// Write observation content length
getOrCreateCodedOutputStreamHolder().getOutputStream().writeUInt32NoTag(observationContentSize);
getOrCreateCodedOutputStream().writeUInt32NoTag(observationContentSize);
// e.g. 1 { 1: 1234 2: 1234 }
writeTimestampForNowToCurrentStream(timestamp);
}

private void writeTimestampForNowToCurrentStream(Instant timestamp) throws IOException {
writeObservationTag(TrafficObservation.TS_FIELD_NUMBER);
getOrCreateCodedOutputStreamHolder().getOutputStream().writeUInt32NoTag(CodedOutputStreamSizeUtil.getSizeOfTimestamp(timestamp));
getOrCreateCodedOutputStream().writeUInt32NoTag(CodedOutputStreamSizeUtil.getSizeOfTimestamp(timestamp));

getOrCreateCodedOutputStreamHolder().getOutputStream().writeInt64(Timestamp.SECONDS_FIELD_NUMBER, timestamp.getEpochSecond());
getOrCreateCodedOutputStream().writeInt64(Timestamp.SECONDS_FIELD_NUMBER, timestamp.getEpochSecond());
if (timestamp.getNano() != 0) {
getOrCreateCodedOutputStreamHolder().getOutputStream().writeInt32(Timestamp.NANOS_FIELD_NUMBER, timestamp.getNano());
getOrCreateCodedOutputStream().writeInt32(Timestamp.NANOS_FIELD_NUMBER, timestamp.getNano());
}
}

private void writeByteBufferToCurrentStream(int fieldNum, ByteBuffer byteBuffer) throws IOException {
if (byteBuffer.remaining() > 0) {
getOrCreateCodedOutputStreamHolder().getOutputStream().writeByteBuffer(fieldNum, byteBuffer);
getOrCreateCodedOutputStream().writeByteBuffer(fieldNum, byteBuffer);
} else {
getOrCreateCodedOutputStreamHolder().getOutputStream().writeUInt32NoTag(0);
getOrCreateCodedOutputStream().writeUInt32NoTag(0);
}
}


private void writeByteStringToCurrentStream(int fieldNum, String str) throws IOException {
if (str.length() > 0) {
getOrCreateCodedOutputStreamHolder().getOutputStream().writeString(fieldNum, str);
getOrCreateCodedOutputStream().writeString(fieldNum, str);
} else {
getOrCreateCodedOutputStreamHolder().getOutputStream().writeUInt32NoTag(0);
getOrCreateCodedOutputStream().writeUInt32NoTag(0);
}
}

Expand All @@ -199,7 +203,7 @@ public CompletableFuture<T> flushCommitAndResetStream(boolean isFinal) throws IO
return CompletableFuture.completedFuture(null);
}
try {
CodedOutputStream currentStream = getOrCreateCodedOutputStreamHolder().getOutputStream();
CodedOutputStream currentStream = getOrCreateCodedOutputStream();
var fieldNum = isFinal ? TrafficStream.NUMBEROFTHISLASTCHUNK_FIELD_NUMBER : TrafficStream.NUMBER_FIELD_NUMBER;
// e.g. 3: 1
currentStream.writeInt32(fieldNum, ++numFlushesSoFar);
Expand All @@ -219,7 +223,7 @@ public CompletableFuture<T> flushCommitAndResetStream(boolean isFinal) throws IO
@Override
public void cancelCaptureForCurrentRequest(Instant timestamp) throws IOException {
beginSubstreamObservation(timestamp, TrafficObservation.REQUESTDROPPED_FIELD_NUMBER, 1);
getOrCreateCodedOutputStreamHolder().getOutputStream().writeMessage(TrafficObservation.REQUESTDROPPED_FIELD_NUMBER,
getOrCreateCodedOutputStream().writeMessage(TrafficObservation.REQUESTDROPPED_FIELD_NUMBER,
RequestIntentionallyDropped.getDefaultInstance());
this.readObservationsAreWaitingForEom = false;
this.firstLineByteLength = -1;
Expand All @@ -244,7 +248,7 @@ public void addDisconnectEvent(Instant timestamp) throws IOException {
@Override
public void addCloseEvent(Instant timestamp) throws IOException {
beginSubstreamObservation(timestamp, TrafficObservation.CLOSE_FIELD_NUMBER, 1);
getOrCreateCodedOutputStreamHolder().getOutputStream().writeMessage(TrafficObservation.CLOSE_FIELD_NUMBER,
getOrCreateCodedOutputStream().writeMessage(TrafficObservation.CLOSE_FIELD_NUMBER,
CloseObservation.getDefaultInstance());
}

Expand All @@ -265,7 +269,7 @@ private void addStringMessage(int captureFieldNumber, int dataFieldNumber,
// e.g. 4 {
writeObservationTag(captureFieldNumber);
if (dataSize > 0) {
getOrCreateCodedOutputStreamHolder().getOutputStream().writeInt32NoTag(dataSize);
getOrCreateCodedOutputStream().writeInt32NoTag(dataSize);
}
writeByteStringToCurrentStream(dataFieldNumber, str);
}
Expand All @@ -292,7 +296,7 @@ private void addDataMessage(int captureFieldNumber, int dataFieldNumber, Instant
int trafficStreamOverhead = messageAndOverheadBytesLeft - byteBuffer.capacity();

// Ensure that space for at least one data byte and overhead exists, otherwise a flush is necessary.
flushIfNeeded(() -> trafficStreamOverhead);
flushIfNeeded(() -> (trafficStreamOverhead + 1));

// If our message is empty or can fit in the current CodedOutputStream no chunking is needed, and we can continue
var spaceLeft = getOrCreateCodedOutputStreamHolder().getOutputStreamSpaceLeft();
Expand Down Expand Up @@ -329,7 +333,7 @@ private void addSubstreamMessage(int captureFieldNumber, int dataFieldNumber, in
int dataSize = 0;
int segmentCountSize = 0;
int captureClosureLength = 1;
CodedOutputStream codedOutputStream = getOrCreateCodedOutputStreamHolder().getOutputStream();
CodedOutputStream codedOutputStream = getOrCreateCodedOutputStream();
if (dataCountFieldNumber > 0) {
segmentCountSize = CodedOutputStream.computeInt32Size(dataCountFieldNumber, dataCount);
}
Expand Down Expand Up @@ -450,14 +454,14 @@ private void writeEndOfHttpMessage(Instant timestamp) throws IOException {
beginSubstreamObservation(timestamp, TrafficObservation.ENDOFMESSAGEINDICATOR_FIELD_NUMBER, eomDataSize);
// e.g. 15 {
writeObservationTag(TrafficObservation.ENDOFMESSAGEINDICATOR_FIELD_NUMBER);
getOrCreateCodedOutputStreamHolder().getOutputStream().writeUInt32NoTag(eomPairSize);
getOrCreateCodedOutputStreamHolder().getOutputStream().writeInt32(EndOfMessageIndication.FIRSTLINEBYTELENGTH_FIELD_NUMBER, firstLineByteLength);
getOrCreateCodedOutputStreamHolder().getOutputStream().writeInt32(EndOfMessageIndication.HEADERSBYTELENGTH_FIELD_NUMBER, headersByteLength);
getOrCreateCodedOutputStream().writeUInt32NoTag(eomPairSize);
getOrCreateCodedOutputStream().writeInt32(EndOfMessageIndication.FIRSTLINEBYTELENGTH_FIELD_NUMBER, firstLineByteLength);
getOrCreateCodedOutputStream().writeInt32(EndOfMessageIndication.HEADERSBYTELENGTH_FIELD_NUMBER, headersByteLength);
}

private void writeEndOfSegmentMessage(Instant timestamp) throws IOException {
beginSubstreamObservation(timestamp, TrafficObservation.SEGMENTEND_FIELD_NUMBER, 1);
getOrCreateCodedOutputStreamHolder().getOutputStream().writeMessage(TrafficObservation.SEGMENTEND_FIELD_NUMBER, EndOfSegmentsIndication.getDefaultInstance());
getOrCreateCodedOutputStream().writeMessage(TrafficObservation.SEGMENTEND_FIELD_NUMBER, EndOfSegmentsIndication.getDefaultInstance());
}

private void observationSizeSanityCheck(int minExpectedSpaceAfterObservation, int fieldNumber) throws IOException {
Expand Down

0 comments on commit 6e490f7

Please sign in to comment.