From 61dbc8ff94e00e6fe0d81e62b9b4a7edc669dd06 Mon Sep 17 00:00:00 2001 From: Andre Kurait Date: Fri, 3 May 2024 11:15:41 -0500 Subject: [PATCH] Address PR comments Signed-off-by: Andre Kurait --- .../CodedOutputStreamSizeUtil.java | 10 +- ...eamChannelConnectionCaptureSerializer.java | 117 +++++++++++------- ...hannelConnectionCaptureSerializerTest.java | 64 +++++++--- .../http/NettyJsonContentAuthSigner.java | 6 +- 4 files changed, 129 insertions(+), 68 deletions(-) diff --git a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/CodedOutputStreamSizeUtil.java b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/CodedOutputStreamSizeUtil.java index efa11ed1a..50fdcf103 100644 --- a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/CodedOutputStreamSizeUtil.java +++ b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/CodedOutputStreamSizeUtil.java @@ -50,16 +50,16 @@ public static int maxBytesNeededForASegmentedObservation(Instant timestamp, int /** * This function determines the number of bytes needed to write the readable bytes in a byteBuf and its tag. */ - public static int computeByteBufRemainingSize(int fieldNumber, ByteBuf buffer) { - return CodedOutputStream.computeTagSize(fieldNumber) + computeByteBufRemainingSizeNoTag(buffer); + public static int computeByteBufRemainingSize(int fieldNumber, ByteBuf buf) { + return CodedOutputStream.computeTagSize(fieldNumber) + computeByteBufRemainingSizeNoTag(buf); } /** * This function determines the number of bytes needed to write the readable bytes in a byteBuf. */ - public static int computeByteBufRemainingSizeNoTag(ByteBuf buffer) { - int bufferSize = buffer.readableBytes(); - return CodedOutputStream.computeUInt32SizeNoTag(bufferSize) + bufferSize; + public static int computeByteBufRemainingSizeNoTag(ByteBuf buf) { + int bufSize = buf.readableBytes(); + return CodedOutputStream.computeUInt32SizeNoTag(bufSize) + bufSize; } diff --git a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java index 47264bbce..345c31f1d 100644 --- a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java +++ b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java @@ -7,7 +7,9 @@ import com.google.protobuf.WireFormat; import io.netty.buffer.ByteBuf; +import java.io.UncheckedIOException; import java.nio.channels.GatheringByteChannel; +import java.util.stream.IntStream; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import org.opensearch.migrations.trafficcapture.protos.CloseObservation; @@ -136,12 +138,24 @@ private CodedOutputStreamHolder getOrCreateCodedOutputStreamHolder() throws IOEx public int currentOutputStreamWriteableSpaceLeft() throws IOException { // Writeable bytes is the space left minus the space needed to complete the next flush - var maxFieldNum = Math.max(TrafficStream.NUMBEROFTHISLASTCHUNK_FIELD_NUMBER, TrafficStream.NUMBER_FIELD_NUMBER); - var spaceNeededForNextFlush = CodedOutputStream.computeInt32Size(maxFieldNum, numFlushesSoFar + 1); + var maxFieldTagNumberToBeWrittenUponStreamFlush = Math.max(TrafficStream.NUMBEROFTHISLASTCHUNK_FIELD_NUMBER, TrafficStream.NUMBER_FIELD_NUMBER); + var spaceNeededForRecordCreationDuringNextFlush = CodedOutputStream.computeInt32Size(maxFieldTagNumberToBeWrittenUponStreamFlush, numFlushesSoFar + 1); var outputStreamSpaceLeft = getOrCreateCodedOutputStreamHolder().getOutputStreamSpaceLeft(); - return outputStreamSpaceLeft == -1 ? -1 : outputStreamSpaceLeft - spaceNeededForNextFlush; + return outputStreamSpaceLeft == -1 ? -1 : outputStreamSpaceLeft - spaceNeededForRecordCreationDuringNextFlush; } + /** + * Checks if the current output stream has enough space for the required size and flushes if not. + * This method evaluates the writable space left in the current stream. If the space is insufficient + * for the required size, it triggers a flush operation by calling {@link #flushCommitAndResetStream(boolean)} + * with 'false' to indicate this is not a final operation. If there is adequate space, + * it returns a completed future with null. + * + * @param requiredSize The size required to write to the current stream. + * @return CompletableFuture A future that completes immediately with null if there is enough space, + * or completes with the future returned by flushCommitAndResetStream if a flush is needed. + * @throws IOException if there are I/O errors when checking the stream's space or flushing. + */ public CompletableFuture flushIfNeeded(int requiredSize) throws IOException { var spaceLeft = currentOutputStreamWriteableSpaceLeft(); if (spaceLeft != -1 && spaceLeft < requiredSize) { @@ -172,7 +186,7 @@ private void beginSubstreamObservation(Instant timestamp, int captureTagFieldNum final var captureTagNoLengthSize = CodedOutputStream.computeTagSize(captureTagFieldNumber); final var observationContentSize = tsTagSize + tsContentSize + captureTagNoLengthSize + captureTagLengthAndContentSize; // Ensure space is available before starting an observation - flushIfNeeded(CodedOutputStreamSizeUtil.bytesNeededForObservationAndClosingIndex(observationContentSize, numFlushesSoFar + 1)).join(); + flushIfNeeded(CodedOutputStreamSizeUtil.bytesNeededForObservationAndClosingIndex(observationContentSize, numFlushesSoFar + 1)); // e.g. 2 { writeTrafficStreamTag(TrafficStream.SUBSTREAM_FIELD_NUMBER); // Write observation content length @@ -191,32 +205,32 @@ private void writeTimestampForNowToCurrentStream(Instant timestamp) throws IOExc } } - // Optimistically calculates maxWriteableSpace taking into account lengthSpace. In some cases, this may - // yield a maxWriteableSpace with one leftover byte, however, this is still the correct max as attempting to write - // the additional byte would yield an additional length byte and overflow - public static int calculateMaxWritableSpace(int totalAvailableSpace, int requestedWriteableSpace) { - // Overestimate the lengthFieldSpace first to then correct in instances where writing one more byte does not result - // in as large a lengthFieldSpace. In instances where we must have a leftoverByte, it will be in the lengthFieldSpace - final int maxLengthFieldSpace = CodedOutputStream.computeUInt32SizeNoTag(totalAvailableSpace); - final int initialEstimatedMaxWriteSpace = totalAvailableSpace - maxLengthFieldSpace; - final int lengthFieldSpaceForInitialEstimatedAndOneMoreByte = CodedOutputStream.computeUInt32SizeNoTag(initialEstimatedMaxWriteSpace + 1); - - int maxWriteBytesSpace = totalAvailableSpace - lengthFieldSpaceForInitialEstimatedAndOneMoreByte; - - return Math.min(maxWriteBytesSpace, requestedWriteableSpace); - } - - // Similar to calculateMaxWritableSpace but perform pessimistic calculation with fewer operations. In some cases returns - // up to 1 byte fewer than what could be written out of the available space. - public static int pessimisticallyCalculateMaxWritableSpace(int totalAvailableSpace, int requestedWriteableSpace) { + /** + * Computes the maximum number of writable bytes for a length-delimited field within a given total available space. + * This method takes into account the space required for encoding the length of the field itself, which might reduce + * the writable space due to the encoding overhead. + * + * @param totalAvailableSpace The total available space in bytes that can be used for both the length field and the data. + * @param requestedFieldSize The desired number of writable bytes the caller wishes to use for the data, excluding the length field. + * @return The maximum number of bytes that can be written as data, which may be less than the requestedWritableSpace due to space + * taken by the length field and totalAvailableSpace. If a length delimited field is written of the returned size, and the returned + * length is less than requestedFieldSize, then there will be at most one byte of available space remaining. In some cases + * this byte is due to overestimation of lengthFieldSpace in which an optimal calculation would have returned one length + * greater, and in other cases it is due to the length returned being at the border of an increase in the lengthFieldSpace + * and thus an additional space on the return value would require two additional space to write. + */ + public static int computeMaxLengthDelimitedFieldSizeForSpace(int totalAvailableSpace, int requestedFieldSize) { + // A pessimistic calculation space required for the length field due to not accounting for the space of the length field itself. + // This may yield a length space one byte greater than optimal, potentially leaving at most one length delimited byte + // which the availableSpace has space for. final int pessimisticLengthFieldSpace = CodedOutputStream.computeUInt32SizeNoTag(totalAvailableSpace); int maxWriteBytesSpace = totalAvailableSpace - pessimisticLengthFieldSpace; - return Math.min(maxWriteBytesSpace, requestedWriteableSpace); + return Math.min(maxWriteBytesSpace, requestedFieldSize); } - private void readByteBufIntoCurrentStream(int fieldNum, ByteBuf buf) throws IOException { var codedOutputStream = getOrCreateCodedOutputStream(); - if (buf.readableBytes() > 0) { + final int bufReadableLength = buf.readableBytes(); + if (bufReadableLength > 0) { // Here we are optimizing to reduce the number of internal copies and merges performed on the netty // ByteBuf to write to the CodedOutputStream especially in cases of Composite and Direct ByteBufs. We // can do this by delegating the individual ByteBuffer writes to netty which will retain the underlying @@ -224,8 +238,8 @@ private void readByteBufIntoCurrentStream(int fieldNum, ByteBuf buf) throws IOEx // we write until hitting ByteBuf/ByteBuffer writerIndex/limit and not their capacity since some // CodedOutputStream operations write until capacity, e.g. CodedOutputStream::writeByteBuffer codedOutputStream.writeTag(fieldNum, WireFormat.WIRETYPE_LENGTH_DELIMITED); - codedOutputStream.writeUInt32NoTag(buf.readableBytes()); - buf.readBytes(new ByteOutputGatheringByteChannel(codedOutputStream), buf.readableBytes()); + codedOutputStream.writeUInt32NoTag(bufReadableLength); + buf.readBytes(new ByteOutputGatheringByteChannel(codedOutputStream), bufReadableLength); assert buf.readableBytes() == 0 : "Expected buf bytes read but instead left " + buf.readableBytes() + " unread."; } else { codedOutputStream.writeUInt32NoTag(0); @@ -241,6 +255,17 @@ private void writeByteStringToCurrentStream(int fieldNum, String str) throws IOE } } + /** + * Writes a record to the stream, flushes it, and begins its closure. This method synchronously sets up + * the closing process of the underlying stream and prepares the CodedOutputStreamHolder to return a new stream on next retrieval. + * Each invocation writes a record to signal the current state: the final chunk if 'isFinal' is true, + * otherwise a continuation. Returns a CompletableFuture that resolves upon the stream's closure. + * + * @param isFinal Indicates if this should be the final operation on the stream. + * @return CompletableFuture A future that completes when the stream is closed. Returns null if already closed or no stream exists and 'isFinal' is false. + * @throws IOException if there are I/O errors during the operation. + */ + @Override public CompletableFuture flushCommitAndResetStream(boolean isFinal) throws IOException { if (streamHasBeenClosed || (currentCodedOutputStreamHolderOrNull == null && !isFinal)) { @@ -342,26 +367,26 @@ private void addDataMessage(int captureFieldNumber, int dataFieldNumber, Instant // Writing one data byte requires two bytes to account for length byte final int maxBytesNeededForOneSegmentWithOneDataByteWithLengthByte = trafficStreamOverhead + 2; - flushIfNeeded(maxBytesNeededForOneSegmentWithOneDataByteWithLengthByte).join(); + flushIfNeeded(maxBytesNeededForOneSegmentWithOneDataByteWithLengthByte); var spaceLeft = currentOutputStreamWriteableSpaceLeft(); + + var bufToRead = buf.duplicate(); // If our message is empty or can fit in the current CodedOutputStream no chunking is needed, and we can continue - if (buf.readableBytes() == 0 || spaceLeft == -1 || messageAndOverheadBytesLeft <= spaceLeft) { + if (bufToRead.readableBytes() == 0 || spaceLeft == -1 || messageAndOverheadBytesLeft <= spaceLeft) { int minExpectedSpaceAfterObservation = spaceLeft - messageAndOverheadBytesLeft; - addSubstreamMessage(captureFieldNumber, dataFieldNumber, timestamp, buf.duplicate()); + addSubstreamMessage(captureFieldNumber, dataFieldNumber, timestamp, bufToRead); observationSizeSanityCheck(minExpectedSpaceAfterObservation, captureFieldNumber); } else { - var readBuffer = buf.duplicate(); - while(readBuffer.readableBytes() > 0) { - // addSubstreamMessage will write until COS limit and flush prior if needed + while(bufToRead.readableBytes() > 0) { spaceLeft = currentOutputStreamWriteableSpaceLeft(); - var bytesToRead = pessimisticallyCalculateMaxWritableSpace(spaceLeft - trafficStreamOverhead, readBuffer.readableBytes()); + var bytesToRead = computeMaxLengthDelimitedFieldSizeForSpace(spaceLeft - trafficStreamOverhead, bufToRead.readableBytes()); if (bytesToRead <= 0) { throw new IllegalStateException("Stream space is not allowing forward progress on byteBuf reading"); } - var bufSliceToRead = readBuffer.readSlice(bytesToRead); + var bufSliceToRead = bufToRead.readSlice(bytesToRead); addSubstreamMessage(segmentFieldNumber, segmentDataFieldNumber, timestamp, bufSliceToRead); - if (readBuffer.readableBytes() > 0) { - flushIfNeeded(maxBytesNeededForOneSegmentWithOneDataByteWithLengthByte).join(); + if (bufToRead.readableBytes() > 0) { + flushIfNeeded(maxBytesNeededForOneSegmentWithOneDataByteWithLengthByte); } } writeEndOfSegmentMessage(timestamp); @@ -530,11 +555,16 @@ public int write(ByteBuffer src) throws IOException { } @Override - public long write(ByteBuffer[] srcs, int offset, int length) throws IOException { - for (int i = offset; i < offset + length; i++) { - write(srcs[i]); - } - return length; + public long write(ByteBuffer[] srcs, int offset, int length) throws UncheckedIOException { + return IntStream.range(offset, offset + length) + .mapToLong(i -> { + try { + return write(srcs[i]); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }) + .sum(); } @Override @@ -544,12 +574,11 @@ public long write(ByteBuffer[] srcs) throws IOException { @Override public boolean isOpen() { - throw new UnsupportedOperationException(); + return true; } @Override public void close() { - throw new UnsupportedOperationException(); } } } diff --git a/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java b/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java index 84e1b084b..83445d4c7 100644 --- a/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java +++ b/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java @@ -520,35 +520,60 @@ public void testAssertionErrorDuringInitializationWhenInitializeWithTooLargeId() "268435461,100000000", "268435462,100000000" }) - public void testCalculateMaxWritableSpace(int totalAvailableSpace, int requestedWriteableSpace) { - var calculatedMaxWritableSpace = StreamChannelConnectionCaptureSerializer.calculateMaxWritableSpace(totalAvailableSpace, + public void test_computeMaxLengthDelimitedFieldSizeForSpace(int totalAvailableSpace, int requestedWriteableSpace) { + var optimalMaxWriteableSpace = optimalComputeMaxLengthDelimitedFieldSizeForSpace(totalAvailableSpace, requestedWriteableSpace); - Assertions.assertTrue(calculatedMaxWritableSpace <= requestedWriteableSpace, "cannot write more bytes than requested"); + Assertions.assertTrue(optimalMaxWriteableSpace <= requestedWriteableSpace, "cannot write more bytes than requested"); var spaceLeftAfterWrite = totalAvailableSpace - - CodedOutputStream.computeInt32SizeNoTag(calculatedMaxWritableSpace) - - calculatedMaxWritableSpace; + - CodedOutputStream.computeInt32SizeNoTag(optimalMaxWriteableSpace) + - optimalMaxWriteableSpace; Assertions.assertTrue(spaceLeftAfterWrite >= 0, "expected non-negative space left"); + if (optimalMaxWriteableSpace < requestedWriteableSpace) { + Assertions.assertTrue(spaceLeftAfterWrite <= 1, "expected space left to be no more than 1 if" + + "not enough space for requestedWriteableSpace"); + } - if (calculatedMaxWritableSpace < requestedWriteableSpace) { + if (optimalMaxWriteableSpace < requestedWriteableSpace) { // If we are not writing all requestedWriteableSpace verify there is not space for one more byte var expectedSpaceLeftAfterWriteIfOneMoreByteWrote = totalAvailableSpace - - CodedOutputStream.computeInt32SizeNoTag(calculatedMaxWritableSpace + 1) - - (calculatedMaxWritableSpace + 1); + - CodedOutputStream.computeInt32SizeNoTag(optimalMaxWriteableSpace) + - CodedOutputStream.computeInt32SizeNoTag(optimalMaxWriteableSpace + 1) + - (optimalMaxWriteableSpace + 1); Assertions.assertTrue(expectedSpaceLeftAfterWriteIfOneMoreByteWrote < 0, "expected no space to write one more byte"); } - // Test that when PessimisticallyCalculateMaxWritableSpace != calculatedMaxWritableSpace, then + // Test that when maxWriteableSpace != optimalMaxWriteableSpace, then // it is positive and equal to calculateMaxWritableSpace - 1 - var pessimisticWriteableSpace = StreamChannelConnectionCaptureSerializer.pessimisticallyCalculateMaxWritableSpace(totalAvailableSpace, + var maxWriteableSpace = StreamChannelConnectionCaptureSerializer.computeMaxLengthDelimitedFieldSizeForSpace(totalAvailableSpace, requestedWriteableSpace); - if (pessimisticWriteableSpace != calculatedMaxWritableSpace) { - Assertions.assertTrue(pessimisticWriteableSpace > 0); - Assertions.assertEquals(calculatedMaxWritableSpace - 1, pessimisticWriteableSpace); + if (maxWriteableSpace != optimalMaxWriteableSpace) { + Assertions.assertTrue(maxWriteableSpace > 0); + Assertions.assertEquals(optimalMaxWriteableSpace - 1, maxWriteableSpace); + var spaceLeftIfWritten = totalAvailableSpace - CodedOutputStream.computeInt32SizeNoTag(maxWriteableSpace) - maxWriteableSpace; + if (maxWriteableSpace < requestedWriteableSpace) { + Assertions.assertTrue(spaceLeftIfWritten <= 1, "expected pessimistic space left to be no more than 1 if" + + "not enough space for requestedWriteableSpace"); + } } } + // Optimally calculates maxWriteableSpace taking into account lengthSpace. In some cases, this may + // yield a maxWriteableSpace with one leftover byte, however, this is still the correct max as attempting to write + // the additional byte would yield an additional length byte and overflow + public static int optimalComputeMaxLengthDelimitedFieldSizeForSpace(int totalAvailableSpace, int requestedWriteableSpace) { + // Overestimate the lengthFieldSpace first to then correct in instances where writing one more byte does not result + // in as large a lengthFieldSpace. In instances where we must have a leftoverByte, it will be in the lengthFieldSpace + final int maxLengthFieldSpace = CodedOutputStream.computeUInt32SizeNoTag(totalAvailableSpace); + final int initialEstimatedMaxWriteSpace = totalAvailableSpace - maxLengthFieldSpace; + final int lengthFieldSpaceForInitialEstimatedAndOneMoreByte = CodedOutputStream.computeUInt32SizeNoTag(initialEstimatedMaxWriteSpace + 1); + + int maxWriteBytesSpace = totalAvailableSpace - lengthFieldSpaceForInitialEstimatedAndOneMoreByte; + + return Math.min(maxWriteBytesSpace, requestedWriteableSpace); + } + @Test public void testInitializationWithRealIds() { final String realNodeId = "b671d2f2-577b-414e-9eb4-8bc3e89ee182"; @@ -590,18 +615,23 @@ protected CompletableFuture kickoffCloseStream(CodedOutputStreamHolder out "Unknown outputStreamHolder sent back to StreamManager: " + outputStreamHolder); } var osh = (CodedOutputStreamAndByteBufferWrapper) outputStreamHolder; - log.atTrace().log(() -> "Getting ready to flush for " + osh); - log.atTrace().log(() -> "Bytes written so far... " + StandardCharsets.UTF_8.decode(osh.getByteBuffer().duplicate())); + log.atTrace().setMessage("Getting ready to flush for {}").addArgument(osh).log(); + log.atTrace().setMessage("Bytes written so far... {}") + .addArgument(() -> StandardCharsets.UTF_8.decode(osh.getByteBuffer().duplicate())).log(); return CompletableFuture.runAsync(() -> { try { osh.getOutputStream().flush(); - log.atTrace().log(() -> "Just flushed for " + osh.getOutputStream()); + log.atTrace().setMessage("Just flushed for {}") + .addArgument(osh.getOutputStream()) + .log(); var bb = osh.getByteBuffer(); bb.position(0); var bytesWritten = osh.getOutputStream().getTotalBytesWritten(); bb.limit(bytesWritten); - log.atTrace().log(() -> "Adding " + StandardCharsets.UTF_8.decode(bb.duplicate())); + log.atTrace().setMessage("Adding {}") + .addArgument(() -> StandardCharsets.UTF_8.decode(bb.duplicate())) + .log(); outputBuffers.add(bb); } catch (IOException e) { throw Lombok.sneakyThrow(e); diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonContentAuthSigner.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonContentAuthSigner.java index 8b6c72702..e0073f7c3 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonContentAuthSigner.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/http/NettyJsonContentAuthSigner.java @@ -54,7 +54,8 @@ private boolean flushDownstream(ChannelHandlerContext ctx) { public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { boolean messageFlushed = flushDownstream(ctx); if (messageFlushed) { - log.atWarn().setMessage(() -> "Failed to sign message due to handler removed").log(); + log.atWarn().setMessage(() -> "Failed to sign message due to handler removed" + + " before the end of the http contents were received").log(); } super.handlerRemoved(ctx); } @@ -63,7 +64,8 @@ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { boolean messageFlushed = flushDownstream(ctx); if (messageFlushed) { - log.atWarn().setMessage(() -> "Failed to sign message due to channel unregistered").log(); + log.atWarn().setMessage(() -> "Failed to sign message due to channel unregistered" + + " before the end of the http contents were received").log(); } super.channelUnregistered(ctx); }