diff --git a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java index 8b56e46468..19b667acdf 100644 --- a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java +++ b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java @@ -92,6 +92,10 @@ private static int getWireTypeForFieldIndex(Descriptors.Descriptor d, int fieldN return d.findFieldByNumber(fieldNumber).getLiteType().getWireType(); } + private CodedOutputStream getOrCreateCodedOutputStream() throws IOException { + return getOrCreateCodedOutputStream(); + } + private CodedOutputStreamHolder getOrCreateCodedOutputStreamHolder() throws IOException { if (streamHasBeenClosed) { // In an abundance of caution, flip the state back to basically act like a whole new @@ -137,12 +141,12 @@ public CompletableFuture flushIfNeeded(Supplier requiredSize) throws private void writeTrafficStreamTag(int fieldNumber) throws IOException { - getOrCreateCodedOutputStreamHolder().getOutputStream().writeTag(fieldNumber, + getOrCreateCodedOutputStream().writeTag(fieldNumber, getWireTypeForFieldIndex(TrafficStream.getDescriptor(), fieldNumber)); } private void writeObservationTag(int fieldNumber) throws IOException { - getOrCreateCodedOutputStreamHolder().getOutputStream().writeTag(fieldNumber, + getOrCreateCodedOutputStream().writeTag(fieldNumber, getWireTypeForFieldIndex(TrafficObservation.getDescriptor(), fieldNumber)); } @@ -161,35 +165,35 @@ private void beginSubstreamObservation(Instant timestamp, int captureTagFieldNum // e.g. 2 { writeTrafficStreamTag(TrafficStream.SUBSTREAM_FIELD_NUMBER); // Write observation content length - getOrCreateCodedOutputStreamHolder().getOutputStream().writeUInt32NoTag(observationContentSize); + getOrCreateCodedOutputStream().writeUInt32NoTag(observationContentSize); // e.g. 1 { 1: 1234 2: 1234 } writeTimestampForNowToCurrentStream(timestamp); } private void writeTimestampForNowToCurrentStream(Instant timestamp) throws IOException { writeObservationTag(TrafficObservation.TS_FIELD_NUMBER); - getOrCreateCodedOutputStreamHolder().getOutputStream().writeUInt32NoTag(CodedOutputStreamSizeUtil.getSizeOfTimestamp(timestamp)); + getOrCreateCodedOutputStream().writeUInt32NoTag(CodedOutputStreamSizeUtil.getSizeOfTimestamp(timestamp)); - getOrCreateCodedOutputStreamHolder().getOutputStream().writeInt64(Timestamp.SECONDS_FIELD_NUMBER, timestamp.getEpochSecond()); + getOrCreateCodedOutputStream().writeInt64(Timestamp.SECONDS_FIELD_NUMBER, timestamp.getEpochSecond()); if (timestamp.getNano() != 0) { - getOrCreateCodedOutputStreamHolder().getOutputStream().writeInt32(Timestamp.NANOS_FIELD_NUMBER, timestamp.getNano()); + getOrCreateCodedOutputStream().writeInt32(Timestamp.NANOS_FIELD_NUMBER, timestamp.getNano()); } } private void writeByteBufferToCurrentStream(int fieldNum, ByteBuffer byteBuffer) throws IOException { if (byteBuffer.remaining() > 0) { - getOrCreateCodedOutputStreamHolder().getOutputStream().writeByteBuffer(fieldNum, byteBuffer); + getOrCreateCodedOutputStream().writeByteBuffer(fieldNum, byteBuffer); } else { - getOrCreateCodedOutputStreamHolder().getOutputStream().writeUInt32NoTag(0); + getOrCreateCodedOutputStream().writeUInt32NoTag(0); } } private void writeByteStringToCurrentStream(int fieldNum, String str) throws IOException { if (str.length() > 0) { - getOrCreateCodedOutputStreamHolder().getOutputStream().writeString(fieldNum, str); + getOrCreateCodedOutputStream().writeString(fieldNum, str); } else { - getOrCreateCodedOutputStreamHolder().getOutputStream().writeUInt32NoTag(0); + getOrCreateCodedOutputStream().writeUInt32NoTag(0); } } @@ -199,7 +203,7 @@ public CompletableFuture flushCommitAndResetStream(boolean isFinal) throws IO return CompletableFuture.completedFuture(null); } try { - CodedOutputStream currentStream = getOrCreateCodedOutputStreamHolder().getOutputStream(); + CodedOutputStream currentStream = getOrCreateCodedOutputStream(); var fieldNum = isFinal ? TrafficStream.NUMBEROFTHISLASTCHUNK_FIELD_NUMBER : TrafficStream.NUMBER_FIELD_NUMBER; // e.g. 3: 1 currentStream.writeInt32(fieldNum, ++numFlushesSoFar); @@ -219,7 +223,7 @@ public CompletableFuture flushCommitAndResetStream(boolean isFinal) throws IO @Override public void cancelCaptureForCurrentRequest(Instant timestamp) throws IOException { beginSubstreamObservation(timestamp, TrafficObservation.REQUESTDROPPED_FIELD_NUMBER, 1); - getOrCreateCodedOutputStreamHolder().getOutputStream().writeMessage(TrafficObservation.REQUESTDROPPED_FIELD_NUMBER, + getOrCreateCodedOutputStream().writeMessage(TrafficObservation.REQUESTDROPPED_FIELD_NUMBER, RequestIntentionallyDropped.getDefaultInstance()); this.readObservationsAreWaitingForEom = false; this.firstLineByteLength = -1; @@ -244,7 +248,7 @@ public void addDisconnectEvent(Instant timestamp) throws IOException { @Override public void addCloseEvent(Instant timestamp) throws IOException { beginSubstreamObservation(timestamp, TrafficObservation.CLOSE_FIELD_NUMBER, 1); - getOrCreateCodedOutputStreamHolder().getOutputStream().writeMessage(TrafficObservation.CLOSE_FIELD_NUMBER, + getOrCreateCodedOutputStream().writeMessage(TrafficObservation.CLOSE_FIELD_NUMBER, CloseObservation.getDefaultInstance()); } @@ -265,7 +269,7 @@ private void addStringMessage(int captureFieldNumber, int dataFieldNumber, // e.g. 4 { writeObservationTag(captureFieldNumber); if (dataSize > 0) { - getOrCreateCodedOutputStreamHolder().getOutputStream().writeInt32NoTag(dataSize); + getOrCreateCodedOutputStream().writeInt32NoTag(dataSize); } writeByteStringToCurrentStream(dataFieldNumber, str); } @@ -292,7 +296,7 @@ private void addDataMessage(int captureFieldNumber, int dataFieldNumber, Instant int trafficStreamOverhead = messageAndOverheadBytesLeft - byteBuffer.capacity(); // Ensure that space for at least one data byte and overhead exists, otherwise a flush is necessary. - flushIfNeeded(() -> trafficStreamOverhead); + flushIfNeeded(() -> (trafficStreamOverhead + 1)); // If our message is empty or can fit in the current CodedOutputStream no chunking is needed, and we can continue var spaceLeft = getOrCreateCodedOutputStreamHolder().getOutputStreamSpaceLeft(); @@ -329,7 +333,7 @@ private void addSubstreamMessage(int captureFieldNumber, int dataFieldNumber, in int dataSize = 0; int segmentCountSize = 0; int captureClosureLength = 1; - CodedOutputStream codedOutputStream = getOrCreateCodedOutputStreamHolder().getOutputStream(); + CodedOutputStream codedOutputStream = getOrCreateCodedOutputStream(); if (dataCountFieldNumber > 0) { segmentCountSize = CodedOutputStream.computeInt32Size(dataCountFieldNumber, dataCount); } @@ -450,14 +454,14 @@ private void writeEndOfHttpMessage(Instant timestamp) throws IOException { beginSubstreamObservation(timestamp, TrafficObservation.ENDOFMESSAGEINDICATOR_FIELD_NUMBER, eomDataSize); // e.g. 15 { writeObservationTag(TrafficObservation.ENDOFMESSAGEINDICATOR_FIELD_NUMBER); - getOrCreateCodedOutputStreamHolder().getOutputStream().writeUInt32NoTag(eomPairSize); - getOrCreateCodedOutputStreamHolder().getOutputStream().writeInt32(EndOfMessageIndication.FIRSTLINEBYTELENGTH_FIELD_NUMBER, firstLineByteLength); - getOrCreateCodedOutputStreamHolder().getOutputStream().writeInt32(EndOfMessageIndication.HEADERSBYTELENGTH_FIELD_NUMBER, headersByteLength); + getOrCreateCodedOutputStream().writeUInt32NoTag(eomPairSize); + getOrCreateCodedOutputStream().writeInt32(EndOfMessageIndication.FIRSTLINEBYTELENGTH_FIELD_NUMBER, firstLineByteLength); + getOrCreateCodedOutputStream().writeInt32(EndOfMessageIndication.HEADERSBYTELENGTH_FIELD_NUMBER, headersByteLength); } private void writeEndOfSegmentMessage(Instant timestamp) throws IOException { beginSubstreamObservation(timestamp, TrafficObservation.SEGMENTEND_FIELD_NUMBER, 1); - getOrCreateCodedOutputStreamHolder().getOutputStream().writeMessage(TrafficObservation.SEGMENTEND_FIELD_NUMBER, EndOfSegmentsIndication.getDefaultInstance()); + getOrCreateCodedOutputStream().writeMessage(TrafficObservation.SEGMENTEND_FIELD_NUMBER, EndOfSegmentsIndication.getDefaultInstance()); } private void observationSizeSanityCheck(int minExpectedSpaceAfterObservation, int fieldNumber) throws IOException {