Skip to content

Commit

Permalink
Merge pull request #543 from AndreKurait/SupportLimitlessCodedOutputS…
Browse files Browse the repository at this point in the history
…tream

Support Limitless CodedOutputStreams
  • Loading branch information
AndreKurait authored Mar 29, 2024
2 parents 792c725 + 5d05384 commit b7231e9
Show file tree
Hide file tree
Showing 8 changed files with 231 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ public KafkaCaptureFactory(IRootKafkaOffloaderContext rootScope, String nodeId,
static class CodedOutputStreamWrapper implements CodedOutputStreamHolder {
private final CodedOutputStream codedOutputStream;
private final ByteBuffer byteBuffer;
@Override
public int getOutputStreamBytesLimit() {
return byteBuffer.limit();
}

@Override
public @NonNull CodedOutputStream getOutputStream() {
return codedOutputStream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,19 @@

import java.nio.ByteBuffer;

@Getter
public class CodedOutputStreamAndByteBufferWrapper implements CodedOutputStreamHolder {
@NonNull
@Getter
private final CodedOutputStream outputStream;
@NonNull
@Getter
private final ByteBuffer byteBuffer;

public CodedOutputStreamAndByteBufferWrapper(int bufferSize) {
this.byteBuffer = ByteBuffer.allocate(bufferSize);
outputStream = CodedOutputStream.newInstance(byteBuffer);
}

public int getOutputStreamBytesLimit() {
return byteBuffer.limit();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,28 @@
import lombok.NonNull;

public interface CodedOutputStreamHolder {

/**
* Returns the maximum number of bytes that can be written to the output stream before exceeding
* its limit, or -1 if the stream has no defined limit.
*
* @return the byte limit of the output stream, or -1 if no limit exists.
*/
int getOutputStreamBytesLimit();

/**
* Calculates the remaining space in the output stream based on the limit set by
* {@link #getOutputStreamBytesLimit()}. If the limit is defined, this method returns
* the difference between that limit and the number of bytes already written. If no
* limit is defined, returns -1, indicating unbounded space.
*
* @return the number of remaining bytes that can be written before reaching the limit,
* or -1 if the stream is unbounded.
*/
default int getOutputStreamSpaceLeft() {
var limit = getOutputStreamBytesLimit();
return (limit != -1) ? limit - getOutputStream().getTotalBytesWritten() : -1;
}

@NonNull CodedOutputStream getOutputStream();
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.google.protobuf.Descriptors;
import com.google.protobuf.Timestamp;
import io.netty.buffer.ByteBuf;
import java.util.function.Supplier;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.opensearch.migrations.trafficcapture.protos.CloseObservation;
Expand Down Expand Up @@ -92,6 +93,10 @@ private static int getWireTypeForFieldIndex(Descriptors.Descriptor d, int fieldN
}

private CodedOutputStream getOrCreateCodedOutputStream() throws IOException {
return getOrCreateCodedOutputStreamHolder().getOutputStream();
}

private CodedOutputStreamHolder getOrCreateCodedOutputStreamHolder() throws IOException {
if (streamHasBeenClosed) {
// In an abundance of caution, flip the state back to basically act like a whole new
// stream is being setup
Expand All @@ -105,7 +110,7 @@ private CodedOutputStream getOrCreateCodedOutputStream() throws IOException {
streamHasBeenClosed = false;
}
if (currentCodedOutputStreamHolderOrNull != null) {
return currentCodedOutputStreamHolderOrNull.getOutputStream();
return currentCodedOutputStreamHolderOrNull;
} else {
currentCodedOutputStreamHolderOrNull = streamManager.createStream();
var currentCodedOutputStream = currentCodedOutputStreamHolderOrNull.getOutputStream();
Expand All @@ -122,10 +127,19 @@ private CodedOutputStream getOrCreateCodedOutputStream() throws IOException {
currentCodedOutputStream.writeBool(TrafficStream.LASTOBSERVATIONWASUNTERMINATEDREAD_FIELD_NUMBER,
readObservationsAreWaitingForEom);
}
return currentCodedOutputStream;
return currentCodedOutputStreamHolderOrNull;
}
}

public CompletableFuture<T> flushIfNeeded(Supplier<Integer> requiredSize) throws IOException {
var spaceLeft = getOrCreateCodedOutputStreamHolder().getOutputStreamSpaceLeft();
if (spaceLeft != -1 && spaceLeft < requiredSize.get()) {
return flushCommitAndResetStream(false);
}
return CompletableFuture.completedFuture(null);
}


private void writeTrafficStreamTag(int fieldNumber) throws IOException {
getOrCreateCodedOutputStream().writeTag(fieldNumber,
getWireTypeForFieldIndex(TrafficStream.getDescriptor(), fieldNumber));
Expand All @@ -147,11 +161,7 @@ private void beginSubstreamObservation(Instant timestamp, int captureTagFieldNum
final var captureTagNoLengthSize = CodedOutputStream.computeTagSize(captureTagFieldNumber);
final var observationContentSize = tsTagSize + tsContentSize + captureTagNoLengthSize + captureTagLengthAndContentSize;
// Ensure space is available before starting an observation
if (getOrCreateCodedOutputStream().spaceLeft() <
CodedOutputStreamSizeUtil.bytesNeededForObservationAndClosingIndex(observationContentSize, numFlushesSoFar + 1))
{
flushCommitAndResetStream(false);
}
flushIfNeeded(() -> CodedOutputStreamSizeUtil.bytesNeededForObservationAndClosingIndex(observationContentSize, numFlushesSoFar + 1));
// e.g. 2 {
writeTrafficStreamTag(TrafficStream.SUBSTREAM_FIELD_NUMBER);
// Write observation content length
Expand Down Expand Up @@ -286,20 +296,20 @@ private void addDataMessage(int captureFieldNumber, int dataFieldNumber, Instant
int trafficStreamOverhead = messageAndOverheadBytesLeft - byteBuffer.capacity();

// Ensure that space for at least one data byte and overhead exists, otherwise a flush is necessary.
if (trafficStreamOverhead + 1 >= getOrCreateCodedOutputStream().spaceLeft()) {
flushCommitAndResetStream(false);
}
flushIfNeeded(() -> (trafficStreamOverhead + 1));

// If our message is empty or can fit in the current CodedOutputStream no chunking is needed, and we can continue
if (byteBuffer.limit() == 0 || messageAndOverheadBytesLeft <= getOrCreateCodedOutputStream().spaceLeft()) {
int minExpectedSpaceAfterObservation = getOrCreateCodedOutputStream().spaceLeft() - messageAndOverheadBytesLeft;
var spaceLeft = getOrCreateCodedOutputStreamHolder().getOutputStreamSpaceLeft();
if (byteBuffer.limit() == 0 || spaceLeft == -1 || messageAndOverheadBytesLeft <= spaceLeft) {
int minExpectedSpaceAfterObservation = spaceLeft - messageAndOverheadBytesLeft;
addSubstreamMessage(captureFieldNumber, dataFieldNumber, timestamp, byteBuffer);
observationSizeSanityCheck(minExpectedSpaceAfterObservation, captureFieldNumber);
return;
}

while(byteBuffer.position() < byteBuffer.limit()) {
int availableCOSSpace = getOrCreateCodedOutputStream().spaceLeft();
// COS checked for unbounded limit above
int availableCOSSpace = getOrCreateCodedOutputStreamHolder().getOutputStreamSpaceLeft();
int chunkBytes = messageAndOverheadBytesLeft > availableCOSSpace ? availableCOSSpace - trafficStreamOverhead : byteBuffer.limit() - byteBuffer.position();
ByteBuffer bb = byteBuffer.slice();
bb.limit(chunkBytes);
Expand Down Expand Up @@ -455,8 +465,8 @@ private void writeEndOfSegmentMessage(Instant timestamp) throws IOException {
}

private void observationSizeSanityCheck(int minExpectedSpaceAfterObservation, int fieldNumber) throws IOException {
int actualRemainingSpace = getOrCreateCodedOutputStream().spaceLeft();
if (actualRemainingSpace < minExpectedSpaceAfterObservation || minExpectedSpaceAfterObservation < 0) {
int actualRemainingSpace = getOrCreateCodedOutputStreamHolder().getOutputStreamSpaceLeft();
if (actualRemainingSpace != -1 && (actualRemainingSpace < minExpectedSpaceAfterObservation || minExpectedSpaceAfterObservation < 0)) {
log.warn("Writing a substream (capture type: {}) for Traffic Stream: {} left {} bytes in the CodedOutputStream but we calculated " +
"at least {} bytes remaining, this should be investigated", fieldNumber, connectionIdString + "." + (numFlushesSoFar + 1),
actualRemainingSpace, minExpectedSpaceAfterObservation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import static org.junit.jupiter.api.Assertions.assertThrows;

import com.google.protobuf.ByteString;
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.Timestamp;
import io.netty.buffer.Unpooled;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
Expand All @@ -18,17 +20,18 @@
import java.util.concurrent.ExecutionException;
import lombok.AllArgsConstructor;
import lombok.Lombok;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.opensearch.migrations.trafficcapture.StreamChannelConnectionCaptureSerializerTest.StreamManager.NullStreamManager;
import org.opensearch.migrations.trafficcapture.protos.CloseObservation;
import org.opensearch.migrations.trafficcapture.protos.ConnectionExceptionObservation;
import org.opensearch.migrations.trafficcapture.protos.EndOfMessageIndication;
import org.opensearch.migrations.trafficcapture.protos.EndOfSegmentsIndication;
import org.opensearch.migrations.trafficcapture.protos.ReadObservation;
import org.opensearch.migrations.trafficcapture.protos.TrafficObservation;
import org.opensearch.migrations.trafficcapture.protos.TrafficStream;

import org.opensearch.migrations.trafficcapture.protos.WriteObservation;

@Slf4j
Expand Down Expand Up @@ -80,12 +83,18 @@ private static int getIndexForTrafficStream(TrafficStream s) {

@Test
public void testLargeReadPacketIsSplit() throws IOException, ExecutionException, InterruptedException {
var bufferSize = 1024 * 1024;
var outputBuffersCreated = new ConcurrentLinkedQueue<ByteBuffer>();
var serializer = createSerializerWithTestHandler(outputBuffersCreated, 1024 * 1024);
var serializer = createSerializerWithTestHandler(outputBuffersCreated, bufferSize);

// Create over 1MB packet
String data = FAKE_READ_PACKET_DATA.repeat((1024 * 1024 / FAKE_READ_PACKET_DATA.length()) + 1);
double minGeneratedChunks = 2.2;
int dataRepeat = (int) Math.ceil((((double) bufferSize / FAKE_READ_PACKET_DATA.length()) * minGeneratedChunks));

String data = FAKE_READ_PACKET_DATA.repeat(dataRepeat);
byte[] fakeDataBytes = data.getBytes(StandardCharsets.UTF_8);

int expectedChunks = (fakeDataBytes.length / bufferSize) + ((fakeDataBytes.length % bufferSize == 0) ? 0 : 1);

var bb = Unpooled.wrappedBuffer(fakeDataBytes);
serializer.addReadEvent(REFERENCE_TIMESTAMP, bb);
var future = serializer.flushCommitAndResetStream(true);
Expand All @@ -95,17 +104,18 @@ public void testLargeReadPacketIsSplit() throws IOException, ExecutionException,
var outputBuffersList = new ArrayList<>(outputBuffersCreated);

var reconstitutedTrafficStreamsList = new ArrayList<TrafficStream>();
for (int i = 0; i < 2; ++i) {
for (int i = 0; i < expectedChunks; ++i) {
reconstitutedTrafficStreamsList.add(TrafficStream.parseFrom(outputBuffersList.get(i)));
}
reconstitutedTrafficStreamsList.sort(
Comparator.comparingInt(StreamChannelConnectionCaptureSerializerTest::getIndexForTrafficStream));
int totalSize = 0;
for (int i = 0; i < 2; ++i) {
for (int i = 0; i < expectedChunks; ++i) {
var reconstitutedTrafficStream = reconstitutedTrafficStreamsList.get(i);
int dataSize = reconstitutedTrafficStream.getSubStream(0).getReadSegment().getData().size();
totalSize += dataSize;
Assertions.assertEquals(i + 1, getIndexForTrafficStream(reconstitutedTrafficStream));
Assertions.assertTrue(dataSize <= bufferSize);
}
Assertions.assertEquals(fakeDataBytes.length, totalSize);
}
Expand Down Expand Up @@ -187,6 +197,22 @@ public void testEmptyPacketIsHandledForSmallCodedOutputStream()
Assertions.assertEquals(0, reconstitutedTrafficStream.getSubStream(1).getWrite().getData().size());
}

@Test
public void testWithLimitlessCodedOutputStreamHolder()
throws IOException, ExecutionException, InterruptedException {

var serializer = new StreamChannelConnectionCaptureSerializer<>(TEST_NODE_ID_STRING,
TEST_TRAFFIC_STREAM_ID_STRING,
new NullStreamManager());

var bb = Unpooled.buffer(0);
serializer.addWriteEvent(REFERENCE_TIMESTAMP, bb);
serializer.addWriteEvent(REFERENCE_TIMESTAMP, bb);
var future = serializer.flushCommitAndResetStream(true);
future.get();
bb.release();
}

@Test
public void testThatReadCanBeDeserialized() throws IOException, ExecutionException, InterruptedException {
// these are only here as a debugging aid
Expand Down Expand Up @@ -360,5 +386,33 @@ protected CompletableFuture<Void> kickoffCloseStream(CodedOutputStreamHolder out
}
}).thenApply(x -> null);
}

static class NullStreamManager implements StreamLifecycleManager<CodedOutputStreamHolder> {

@Override
public CodedOutputStreamHolder createStream() {
return new CodedOutputStreamHolder() {
final CodedOutputStream nullOutputStream = CodedOutputStream.newInstance(
OutputStream.nullOutputStream());

@Override
public int getOutputStreamBytesLimit() {
return -1;
}

@Override
public @NonNull CodedOutputStream getOutputStream() {
return nullOutputStream;
}
};
}

@Override
public CompletableFuture<CodedOutputStreamHolder> closeStream(CodedOutputStreamHolder outputStreamHolder,
int index) {
return CompletableFuture.completedFuture(null);
}

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import java.io.OutputStream;
import lombok.Lombok;
import lombok.NonNull;
import lombok.SneakyThrows;
Expand All @@ -15,7 +16,6 @@
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.logging.log4j.core.util.NullOutputStream;
import org.opensearch.common.settings.Settings;
import org.opensearch.migrations.tracing.RootOtelContext;
import org.opensearch.migrations.trafficcapture.CodedOutputStreamHolder;
Expand Down Expand Up @@ -190,7 +190,19 @@ protected static IConnectionCaptureFactory<Object> getNullConnectionCaptureFacto
new StreamLifecycleManager<>() {
@Override
public CodedOutputStreamHolder createStream() {
return () -> CodedOutputStream.newInstance(NullOutputStream.getInstance());
return new CodedOutputStreamHolder() {
final CodedOutputStream nullOutputStream = CodedOutputStream.newInstance(OutputStream.nullOutputStream());

@Override
public int getOutputStreamBytesLimit() {
return -1;
}

@Override
public @NonNull CodedOutputStream getOutputStream() {
return nullOutputStream;
}
};
}

@Override
Expand Down
Loading

0 comments on commit b7231e9

Please sign in to comment.