Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Favor ByteBuf Duplicates, NettyJsonContentAuthSigner and StreamChannelConnectionCaptureSerializer Refactoring #615

Merged
merged 18 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
66f9e58
Prefer duplicate to slices
AndreKurait Apr 26, 2024
3d28553
Simplify ParsedHttpMessagesAsDicts Base64 Encoding
AndreKurait Apr 26, 2024
4bf9a0f
Fix release in NettyJsonContentCompressor
AndreKurait Apr 26, 2024
e2d7aaf
Refactor NettyJsonContentAuthSigner for safer operation
AndreKurait Apr 26, 2024
05271b2
Remove reader index modification in HttpJsonTransformingConsumer
AndreKurait Apr 26, 2024
daf2b23
Add duplicate before each byteBuf read
AndreKurait Apr 26, 2024
7943c26
Add StreamChannelConnectionCaptureSerializerTest for buffer with fewe…
AndreKurait Apr 29, 2024
5afd1a0
Fix StreamChannelConnectionCaptureSerializer behavior for nioBuffers …
AndreKurait Apr 30, 2024
bcc1c97
Log when NettyJsonContentAuthSigner fails to sign message
AndreKurait May 1, 2024
578cb0b
Fill headers before body in ParsedHttpMessagesAsDicts
AndreKurait May 1, 2024
171dea0
Add StreamChannelConnectionCaptureSerializerTests around segmentedObs…
AndreKurait May 2, 2024
9f78caf
Fix StreamChannelConnectionCaptureSerializer edge case around segment…
AndreKurait May 2, 2024
8bc47ca
Fix ResultsToLogsConsumerTest with body at end of tuple output
AndreKurait May 2, 2024
7fbd7a5
Add LeakDetection to StreamChannelConnectionCaptureSerializerTest
AndreKurait May 2, 2024
1b66165
Remove memory leaks in ConditionallyReliableLoggingHttpHandlerTest
AndreKurait May 2, 2024
0325b24
Add PessimisticallyCalculateMaxWritableSpace
AndreKurait May 3, 2024
61dbc8f
Address PR comments
AndreKurait May 3, 2024
7b270fa
Simplify TestUtils getStringFromContent
AndreKurait May 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions TrafficCapture/captureOffloader/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ dependencies {
implementation group: 'org.projectlombok', name: 'lombok', version: '1.18.26'
implementation group: 'org.slf4j', name: 'slf4j-api', version: '2.0.7'

testImplementation testFixtures(project(path: ':testUtilities'))
testImplementation project(':coreUtilities')
testImplementation group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.20.0'
testImplementation group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.20.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.Timestamp;
import io.netty.buffer.ByteBuf;
import org.opensearch.migrations.trafficcapture.protos.TrafficObservation;
import org.opensearch.migrations.trafficcapture.protos.TrafficStream;

import java.nio.ByteBuffer;
import java.time.Instant;

/**
Expand All @@ -28,25 +28,41 @@ public static int getSizeOfTimestamp(Instant t) {

/**
* This function calculates the maximum bytes that would be needed to store a [Read/Write]SegmentObservation, if constructed
* from the given ByteBuffer and associated segment field numbers and values passed in. This estimate is essentially
* the max size needed in the CodedOutputStream to store the provided ByteBuffer data and its associated TrafficStream
* from the given ByteBuf and associated segment field numbers and values passed in. This estimate is essentially
* the max size needed in the CodedOutputStream to store the provided ByteBuf data and its associated TrafficStream
* overhead. The actual required bytes could be marginally smaller.
*/
public static int maxBytesNeededForASegmentedObservation(Instant timestamp, int observationFieldNumber,
int dataFieldNumber, ByteBuffer buffer) {
int dataFieldNumber, ByteBuf buf) {
// Timestamp required bytes
int tsContentSize = getSizeOfTimestamp(timestamp);
int tsTagAndContentSize = CodedOutputStream.computeInt32Size(TrafficObservation.TS_FIELD_NUMBER, tsContentSize) + tsContentSize;

// Capture required bytes
int dataSize = CodedOutputStream.computeByteBufferSize(dataFieldNumber, buffer);
int dataSize = computeByteBufRemainingSize(dataFieldNumber, buf);
int captureTagAndContentSize = CodedOutputStream.computeInt32Size(observationFieldNumber, dataSize) + dataSize;

// Observation and closing index required bytes
return bytesNeededForObservationAndClosingIndex(tsTagAndContentSize + captureTagAndContentSize,
Integer.MAX_VALUE);
}

/**
* This function determines the number of bytes needed to write the readable bytes in a byteBuf and its tag.
*/
public static int computeByteBufRemainingSize(int fieldNumber, ByteBuf buffer) {
AndreKurait marked this conversation as resolved.
Show resolved Hide resolved
return CodedOutputStream.computeTagSize(fieldNumber) + computeByteBufRemainingSizeNoTag(buffer);
}

/**
* This function determines the number of bytes needed to write the readable bytes in a byteBuf.
*/
public static int computeByteBufRemainingSizeNoTag(ByteBuf buffer) {
int bufferSize = buffer.readableBytes();
return CodedOutputStream.computeUInt32SizeNoTag(bufferSize) + bufferSize;
}


/**
* This function determines the number of bytes needed to store a TrafficObservation and a closing index for a
* TrafficStream, from the provided input.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package org.opensearch.migrations.trafficcapture;

import com.google.protobuf.ByteOutput;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Timestamp;
import com.google.protobuf.WireFormat;
import io.netty.buffer.ByteBuf;

import java.util.function.IntSupplier;
import java.util.function.Supplier;
import java.nio.channels.GatheringByteChannel;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.opensearch.migrations.trafficcapture.protos.CloseObservation;
Expand Down Expand Up @@ -133,9 +134,17 @@
}
}

public CompletableFuture<T> flushIfNeeded(IntSupplier requiredSize) throws IOException {
var spaceLeft = getOrCreateCodedOutputStreamHolder().getOutputStreamSpaceLeft();
if (spaceLeft != -1 && spaceLeft < requiredSize.getAsInt()) {
public int currentOutputStreamWriteableSpaceLeft() throws IOException {
// Writeable bytes is the space left minus the space needed to complete the next flush
AndreKurait marked this conversation as resolved.
Show resolved Hide resolved
var maxFieldNum = Math.max(TrafficStream.NUMBEROFTHISLASTCHUNK_FIELD_NUMBER, TrafficStream.NUMBER_FIELD_NUMBER);
var spaceNeededForNextFlush = CodedOutputStream.computeInt32Size(maxFieldNum, numFlushesSoFar + 1);
AndreKurait marked this conversation as resolved.
Show resolved Hide resolved
var outputStreamSpaceLeft = getOrCreateCodedOutputStreamHolder().getOutputStreamSpaceLeft();
return outputStreamSpaceLeft == -1 ? -1 : outputStreamSpaceLeft - spaceNeededForNextFlush;
}

public CompletableFuture<T> flushIfNeeded(int requiredSize) throws IOException {
var spaceLeft = currentOutputStreamWriteableSpaceLeft();
if (spaceLeft != -1 && spaceLeft < requiredSize) {
return flushCommitAndResetStream(false);
}
return CompletableFuture.completedFuture(null);
Expand Down Expand Up @@ -163,7 +172,7 @@
final var captureTagNoLengthSize = CodedOutputStream.computeTagSize(captureTagFieldNumber);
final var observationContentSize = tsTagSize + tsContentSize + captureTagNoLengthSize + captureTagLengthAndContentSize;
// Ensure space is available before starting an observation
flushIfNeeded(() -> CodedOutputStreamSizeUtil.bytesNeededForObservationAndClosingIndex(observationContentSize, numFlushesSoFar + 1));
flushIfNeeded(CodedOutputStreamSizeUtil.bytesNeededForObservationAndClosingIndex(observationContentSize, numFlushesSoFar + 1)).join();
AndreKurait marked this conversation as resolved.
Show resolved Hide resolved
// e.g. 2 {
writeTrafficStreamTag(TrafficStream.SUBSTREAM_FIELD_NUMBER);
// Write observation content length
Expand All @@ -182,11 +191,44 @@
}
}

private void writeByteBufferToCurrentStream(int fieldNum, ByteBuffer byteBuffer) throws IOException {
if (byteBuffer.remaining() > 0) {
getOrCreateCodedOutputStream().writeByteBuffer(fieldNum, byteBuffer);
// Optimistically calculates maxWriteableSpace taking into account lengthSpace. In some cases, this may
AndreKurait marked this conversation as resolved.
Show resolved Hide resolved
// yield a maxWriteableSpace with one leftover byte, however, this is still the correct max as attempting to write
// the additional byte would yield an additional length byte and overflow
public static int calculateMaxWritableSpace(int totalAvailableSpace, int requestedWriteableSpace) {
// Overestimate the lengthFieldSpace first to then correct in instances where writing one more byte does not result
// in as large a lengthFieldSpace. In instances where we must have a leftoverByte, it will be in the lengthFieldSpace
final int maxLengthFieldSpace = CodedOutputStream.computeUInt32SizeNoTag(totalAvailableSpace);
final int initialEstimatedMaxWriteSpace = totalAvailableSpace - maxLengthFieldSpace;
final int lengthFieldSpaceForInitialEstimatedAndOneMoreByte = CodedOutputStream.computeUInt32SizeNoTag(initialEstimatedMaxWriteSpace + 1);

int maxWriteBytesSpace = totalAvailableSpace - lengthFieldSpaceForInitialEstimatedAndOneMoreByte;

return Math.min(maxWriteBytesSpace, requestedWriteableSpace);
}

// Similar to calculateMaxWritableSpace but perform pessimistic calculation with fewer operations. In some cases returns
// up to 1 byte fewer than what could be written out of the available space.
public static int pessimisticallyCalculateMaxWritableSpace(int totalAvailableSpace, int requestedWriteableSpace) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gregschohn, added, depended on, and tested pessimisticallyCalculateMaxWritableSpace instead of calculateMaxWritableSpace

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function calculates the max bytes that I could write to a protobuf stream, including the overhead to write the array, right? I'm thinking something like "calculateMaxArrayLengthForSpace" - something that indicates that exactly one piece of overhead is taken into account here, not the tag, no omission of the length. A clearer name (as hard as that is) will make this definition (which is now really easy to understand, thanks) and its applications more obvious.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking computeMaxLengthDelimitedFieldSizeForSpace which is analogous to (internal method) computeLengthDelimitedFieldSize in CodedOutputStream

final int pessimisticLengthFieldSpace = CodedOutputStream.computeUInt32SizeNoTag(totalAvailableSpace);
int maxWriteBytesSpace = totalAvailableSpace - pessimisticLengthFieldSpace;
return Math.min(maxWriteBytesSpace, requestedWriteableSpace);
}

private void readByteBufIntoCurrentStream(int fieldNum, ByteBuf buf) throws IOException {
var codedOutputStream = getOrCreateCodedOutputStream();
if (buf.readableBytes() > 0) {
AndreKurait marked this conversation as resolved.
Show resolved Hide resolved
// Here we are optimizing to reduce the number of internal copies and merges performed on the netty
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would roll this comment so that it's after the next two code lines

// ByteBuf to write to the CodedOutputStream especially in cases of Composite and Direct ByteBufs. We
// can do this by delegating the individual ByteBuffer writes to netty which will retain the underlying
// structure. We also need to be careful with the exact CodedOutputStream operations performed to ensure
// we write until hitting ByteBuf/ByteBuffer writerIndex/limit and not their capacity since some
// CodedOutputStream operations write until capacity, e.g. CodedOutputStream::writeByteBuffer
codedOutputStream.writeTag(fieldNum, WireFormat.WIRETYPE_LENGTH_DELIMITED);
codedOutputStream.writeUInt32NoTag(buf.readableBytes());
buf.readBytes(new ByteOutputGatheringByteChannel(codedOutputStream), buf.readableBytes());
assert buf.readableBytes() == 0 : "Expected buf bytes read but instead left " + buf.readableBytes() + " unread.";
} else {
getOrCreateCodedOutputStream().writeUInt32NoTag(0);
codedOutputStream.writeUInt32NoTag(0);
}
}

Expand Down Expand Up @@ -276,8 +318,7 @@
writeByteStringToCurrentStream(dataFieldNumber, str);
}

private void addDataMessage(int captureFieldNumber, int dataFieldNumber, Instant timestamp, ByteBuf buffer) throws IOException {
var byteBuffer = buffer.nioBuffer();
private void addDataMessage(int captureFieldNumber, int dataFieldNumber, Instant timestamp, ByteBuf buf) throws IOException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

woohoo! No more nioBuffer() call!

int segmentFieldNumber;
int segmentDataFieldNumber;
if (captureFieldNumber == TrafficObservation.READ_FIELD_NUMBER) {
Expand All @@ -293,54 +334,53 @@
// the potentially required bytes for simplicity. This could leave ~5 bytes of unused space in the CodedOutputStream
// when considering the case of a message that does not need segments or for the case of a smaller segment created
// from a much larger message
int messageAndOverheadBytesLeft = CodedOutputStreamSizeUtil.maxBytesNeededForASegmentedObservation(timestamp,
segmentFieldNumber, segmentDataFieldNumber, byteBuffer);
int trafficStreamOverhead = messageAndOverheadBytesLeft - byteBuffer.capacity();
final int messageAndOverheadBytesLeft = CodedOutputStreamSizeUtil.maxBytesNeededForASegmentedObservation(timestamp,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this taking the current buf into account? If we can write a fixed number of bytes (1?) in the next observation, won't we proceed with a partial? If we can't write an observation, we'll move on. Why do we need the initial size?

segmentFieldNumber, segmentDataFieldNumber, buf);
final int dataSize = CodedOutputStreamSizeUtil.computeByteBufRemainingSizeNoTag(buf);
final int trafficStreamOverhead = messageAndOverheadBytesLeft - dataSize;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this comment is still valid. See the one two earlier.

rename to something more like overheadAvailableForRestOfTrafficStreamAfterAddingBuf? As it is, it feels like this is more for what we need for a trafficStream.
A comment with a schematic and labels might help. e.g.

TrafficStream: SIZE=... (min/max range?)
 Any other components that we might care about to be called out separately...
 Observation:
   Timestamp:
   Tag:
   Data: (optional)
     length:
     bytes:

That isn't even quite right, but my hope is that if you have things really clearly defined and labeled, we might be able to come up with some conventions for how to define different computed values.


// Ensure that space for at least one data byte and overhead exists, otherwise a flush is necessary.
flushIfNeeded(() -> (trafficStreamOverhead + 1));
// Writing one data byte requires two bytes to account for length byte
final int maxBytesNeededForOneSegmentWithOneDataByteWithLengthByte = trafficStreamOverhead + 2;

flushIfNeeded(maxBytesNeededForOneSegmentWithOneDataByteWithLengthByte).join();
var spaceLeft = currentOutputStreamWriteableSpaceLeft();
// If our message is empty or can fit in the current CodedOutputStream no chunking is needed, and we can continue
var spaceLeft = getOrCreateCodedOutputStreamHolder().getOutputStreamSpaceLeft();
if (byteBuffer.limit() == 0 || spaceLeft == -1 || messageAndOverheadBytesLeft <= spaceLeft) {
if (buf.readableBytes() == 0 || spaceLeft == -1 || messageAndOverheadBytesLeft <= spaceLeft) {
int minExpectedSpaceAfterObservation = spaceLeft - messageAndOverheadBytesLeft;
addSubstreamMessage(captureFieldNumber, dataFieldNumber, timestamp, byteBuffer);
addSubstreamMessage(captureFieldNumber, dataFieldNumber, timestamp, buf.duplicate());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, you could duplicate before the branch since both branches need it

observationSizeSanityCheck(minExpectedSpaceAfterObservation, captureFieldNumber);
return;
}

while(byteBuffer.position() < byteBuffer.limit()) {
// COS checked for unbounded limit above
int availableCOSSpace = getOrCreateCodedOutputStreamHolder().getOutputStreamSpaceLeft();
int chunkBytes = messageAndOverheadBytesLeft > availableCOSSpace ? availableCOSSpace - trafficStreamOverhead : byteBuffer.limit() - byteBuffer.position();
ByteBuffer bb = byteBuffer.slice();
bb.limit(chunkBytes);
bb = bb.slice();
byteBuffer.position(byteBuffer.position() + chunkBytes);
addSubstreamMessage(segmentFieldNumber, segmentDataFieldNumber, timestamp, bb);
int minExpectedSpaceAfterObservation = availableCOSSpace - chunkBytes - trafficStreamOverhead;
observationSizeSanityCheck(minExpectedSpaceAfterObservation, segmentFieldNumber);
// 1 to N-1 chunked messages
if (byteBuffer.position() < byteBuffer.limit()) {
flushCommitAndResetStream(false);
messageAndOverheadBytesLeft = messageAndOverheadBytesLeft - chunkBytes;
} else {
var readBuffer = buf.duplicate();
while(readBuffer.readableBytes() > 0) {
// addSubstreamMessage will write until COS limit and flush prior if needed
spaceLeft = currentOutputStreamWriteableSpaceLeft();
var bytesToRead = pessimisticallyCalculateMaxWritableSpace(spaceLeft - trafficStreamOverhead, readBuffer.readableBytes());
if (bytesToRead <= 0) {
throw new IllegalStateException("Stream space is not allowing forward progress on byteBuf reading");
}
var bufSliceToRead = readBuffer.readSlice(bytesToRead);
addSubstreamMessage(segmentFieldNumber, segmentDataFieldNumber, timestamp, bufSliceToRead);
if (readBuffer.readableBytes() > 0) {
flushIfNeeded(maxBytesNeededForOneSegmentWithOneDataByteWithLengthByte).join();
}
}
writeEndOfSegmentMessage(timestamp);
}
writeEndOfSegmentMessage(timestamp);

}

private void addSubstreamMessage(int captureFieldNumber, int dataFieldNumber, int dataCountFieldNumber, int dataCount,
Instant timestamp, java.nio.ByteBuffer byteBuffer) throws IOException {
int dataSize = 0;
Instant timestamp, ByteBuf byteBuf) throws IOException {
int dataBytesSize = 0;
int dataTagSize = 0;
int dataSize = dataBytesSize + dataTagSize;
int segmentCountSize = 0;
int captureClosureLength = 1;
CodedOutputStream codedOutputStream = getOrCreateCodedOutputStream();
if (dataCountFieldNumber > 0) {
segmentCountSize = CodedOutputStream.computeInt32Size(dataCountFieldNumber, dataCount);
}
if (byteBuffer.remaining() > 0) {
dataSize = CodedOutputStream.computeByteBufferSize(dataFieldNumber, byteBuffer);
if (byteBuf.readableBytes() > 0) {
dataSize = CodedOutputStreamSizeUtil.computeByteBufRemainingSize(dataFieldNumber, byteBuf);
captureClosureLength = CodedOutputStream.computeInt32SizeNoTag(dataSize + segmentCountSize);
}
beginSubstreamObservation(timestamp, captureFieldNumber, captureClosureLength + dataSize + segmentCountSize);
Expand All @@ -355,16 +395,16 @@
codedOutputStream.writeInt32(dataCountFieldNumber, dataCount);
}
// Write data field
writeByteBufferToCurrentStream(dataFieldNumber, byteBuffer);
readByteBufIntoCurrentStream(dataFieldNumber, byteBuf);
if (captureFieldNumber == TrafficObservation.READ_FIELD_NUMBER ||
captureFieldNumber == TrafficObservation.READSEGMENT_FIELD_NUMBER) {
this.readObservationsAreWaitingForEom = true;
}
}

private void addSubstreamMessage(int captureFieldNumber, int dataFieldNumber, Instant timestamp,
java.nio.ByteBuffer byteBuffer) throws IOException {
addSubstreamMessage(captureFieldNumber, dataFieldNumber, 0, 0, timestamp, byteBuffer);
ByteBuf byteBuf) throws IOException {
addSubstreamMessage(captureFieldNumber, dataFieldNumber, 0, 0, timestamp, byteBuf);
}

@Override
Expand Down Expand Up @@ -467,11 +507,49 @@
}

private void observationSizeSanityCheck(int minExpectedSpaceAfterObservation, int fieldNumber) throws IOException {
int actualRemainingSpace = getOrCreateCodedOutputStreamHolder().getOutputStreamSpaceLeft();
int actualRemainingSpace = currentOutputStreamWriteableSpaceLeft();
if (actualRemainingSpace != -1 && (actualRemainingSpace < minExpectedSpaceAfterObservation || minExpectedSpaceAfterObservation < 0)) {
log.warn("Writing a substream (capture type: {}) for Traffic Stream: {} left {} bytes in the CodedOutputStream but we calculated " +
"at least {} bytes remaining, this should be investigated", fieldNumber, connectionIdString + "." + (numFlushesSoFar + 1),
actualRemainingSpace, minExpectedSpaceAfterObservation);
}
}

private static class ByteOutputGatheringByteChannel implements GatheringByteChannel {
final ByteOutput byteOutput;

public ByteOutputGatheringByteChannel(ByteOutput byteOutput) {
this.byteOutput = byteOutput;
}

@Override
public int write(ByteBuffer src) throws IOException {
var bytesToWrite = src.remaining();
byteOutput.write(src);
return bytesToWrite - src.remaining();
}

@Override
public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
for (int i = offset; i < offset + length; i++) {
write(srcs[i]);

Check warning on line 535 in TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java#L535

Added line #L535 was not covered by tests
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - this could be a one-liner with the help of Arrays

return length;

Check warning on line 537 in TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java#L537

Added line #L537 was not covered by tests
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This value is incorrect - you need to accumulate the sizes and return the bytes written...

Returns:
The number of bytes written, possibly zero

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

}

@Override
public long write(ByteBuffer[] srcs) throws IOException {
return write(srcs, 0, srcs.length);

Check warning on line 542 in TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java#L542

Added line #L542 was not covered by tests
}

@Override
public boolean isOpen() {
throw new UnsupportedOperationException();

Check warning on line 547 in TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java#L547

Added line #L547 was not covered by tests
}

@Override
public void close() {
throw new UnsupportedOperationException();

Check warning on line 552 in TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java#L552

Added line #L552 was not covered by tests
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why would you penalize somebody for calling close()? It's more future-proof to just leave this as a noop

}
}
}
Loading