Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancements in StreamChannelConnectionCaptureSerializer #494

Merged
merged 6 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions TrafficCapture/captureOffloader/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ dependencies {
testImplementation group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.20.0'
testImplementation group: 'org.apache.logging.log4j', name: 'log4j-slf4j2-impl', version: '2.20.0'
testImplementation group: 'org.slf4j', name: 'slf4j-api', version: '2.0.7'
testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.10.0'
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '5.10.0'

testFixturesImplementation "com.google.protobuf:protobuf-java:3.22.2"
testFixturesImplementation project(':captureProtobufs')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import com.google.protobuf.Timestamp;
import io.netty.buffer.ByteBuf;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

import org.opensearch.migrations.trafficcapture.protos.CloseObservation;
import org.opensearch.migrations.trafficcapture.protos.ConnectionExceptionObservation;
Expand All @@ -18,6 +17,8 @@
import org.opensearch.migrations.trafficcapture.protos.TrafficStream;
import org.opensearch.migrations.trafficcapture.protos.WriteObservation;
import org.opensearch.migrations.trafficcapture.protos.WriteSegmentObservation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.SocketAddress;
Expand All @@ -26,23 +27,24 @@
import java.util.concurrent.CompletableFuture;

/**
* At a basic level, this class aims to be a generic serializer which can receive ByteBuffer data and serialize the data
* into the defined Protobuf format {@link org.opensearch.migrations.trafficcapture.protos.TrafficStream}, and then write
* this formatted data to the provided CodedOutputStream.
*
* Commented throughout the class are example markers such as (e.g. 1: "1234ABCD") which line up with the textual
* representation of this Protobuf format to be used as a guide as fields are written. An example TrafficStream can
* also be visualized below for reference.
*
* 1: "91ba4f3a-0b34-11ee-be56-0242ac120002"
* This class serves as a generic serializer. Its primary function is to take ByteBuffer data,
* serialize it into the Protobuf format as defined by
* {@link org.opensearch.migrations.trafficcapture.protos.TrafficStream}, and then output
* the formatted data to a given CodedOutputStream.
* <p>
* Within the class, example markers are commented (e.g., 1: "9a25a4fffe620014-00034cfa-00000001-d208faac76346d02-864e38e2").
* These markers correspond to the textual representation of the Protobuf format and serve as a guide
* for field serialization. Below is a visual representation of an example `TrafficStream` for further reference:
* <pre>{@code
* 1: "9a25a4fffe620014-00034cfa-00000001-d208faac76346d02-864e38e2"
* 5: "5ae27fca-0ac4-11ee-be56-0242ac120002"
* 2 {
* 1 {
* 1: 1683655127
* 2: 682312000
* }
* 4 {
* 1: "POST /test-index/_bulk?pretty…
* 1: "POST /test-index/_bulk?pretty…"
* }
* }
* 2 {
Expand All @@ -56,11 +58,15 @@
* }
* }
* 3: 1
* }
* </pre>
*/
@Slf4j
public class StreamChannelConnectionCaptureSerializer<T> implements IChannelConnectionCaptureSerializer<T> {

private static final int MAX_ID_SIZE = 96;
private final org.slf4j.Logger log;

// 100 is the default size of netty connectionId and kafka nodeId along with serializationTags
private static final int MAX_ID_SIZE = 100;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gregschohn, let me know if we should increase this as 100 is exactly what we expect out of our realIds

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion. If I were doing this code from scratch, I'd probably put an assertion that I could create a new CodedOutputStream from the factory that was able to write the boilerplate stuff - all within a lambda inside the assert statement.


private boolean readObservationsAreWaitingForEom;
private int eomsSoFar;
Expand All @@ -76,6 +82,15 @@ public class StreamChannelConnectionCaptureSerializer<T> implements IChannelConn

public StreamChannelConnectionCaptureSerializer(String nodeId, String connectionId,
@NonNull StreamLifecycleManager<T> streamLifecycleManager) {
this(nodeId, connectionId, streamLifecycleManager, LoggerFactory.getLogger(StreamChannelConnectionCaptureSerializer.class));
}


// Exposed for testing
public StreamChannelConnectionCaptureSerializer(String nodeId, String connectionId,
StreamLifecycleManager<T> streamLifecycleManager,
Logger log) {
this.log = log;
this.streamManager = streamLifecycleManager;
assert (nodeId == null ? 0 : CodedOutputStream.computeStringSize(TrafficStream.NODEID_FIELD_NUMBER, nodeId)) +
CodedOutputStream.computeStringSize(TrafficStream.CONNECTIONID_FIELD_NUMBER, connectionId)
Expand Down Expand Up @@ -106,7 +121,7 @@ private CodedOutputStream getOrCreateCodedOutputStream() throws IOException {
} else {
currentCodedOutputStreamHolderOrNull = streamManager.createStream();
var currentCodedOutputStream = currentCodedOutputStreamHolderOrNull.getOutputStream();
// e.g. 1: "1234ABCD"
// e.g. 1: "9a25a4fffe620014-00034cfa-00000001-d208faac76346d02-864e38e2"
currentCodedOutputStream.writeString(TrafficStream.CONNECTIONID_FIELD_NUMBER, connectionIdString);
if (nodeIdString != null) {
// e.g. 5: "5ae27fca-0ac4-11ee-be56-0242ac120002"
Expand Down Expand Up @@ -145,8 +160,7 @@ private void beginSubstreamObservation(Instant timestamp, int captureTagFieldNum
final var observationContentSize = tsTagSize + tsContentSize + captureTagNoLengthSize + captureTagLengthAndContentSize;
// Ensure space is available before starting an observation
if (getOrCreateCodedOutputStream().spaceLeft() <
CodedOutputStreamSizeUtil.bytesNeededForObservationAndClosingIndex(observationContentSize, numFlushesSoFar + 1))
{
CodedOutputStreamSizeUtil.bytesNeededForObservationAndClosingIndex(observationContentSize, numFlushesSoFar + 1)) {
flushCommitAndResetStream(false);
}
// e.g. 2 {
Expand Down Expand Up @@ -265,8 +279,7 @@ private void addDataMessage(int captureFieldNumber, int dataFieldNumber, Instant
if (captureFieldNumber == TrafficObservation.READ_FIELD_NUMBER) {
segmentFieldNumber = TrafficObservation.READSEGMENT_FIELD_NUMBER;
segmentDataFieldNumber = ReadSegmentObservation.DATA_FIELD_NUMBER;
}
else {
} else {
segmentFieldNumber = TrafficObservation.WRITESEGMENT_FIELD_NUMBER;
segmentDataFieldNumber = WriteSegmentObservation.DATA_FIELD_NUMBER;
}
Expand All @@ -276,7 +289,7 @@ private void addDataMessage(int captureFieldNumber, int dataFieldNumber, Instant
// when considering the case of a message that does not need segments or for the case of a smaller segment created
// from a much larger message
int messageAndOverheadBytesLeft = CodedOutputStreamSizeUtil.maxBytesNeededForASegmentedObservation(timestamp,
segmentFieldNumber, segmentDataFieldNumber, byteBuffer);
segmentFieldNumber, segmentDataFieldNumber, byteBuffer);
int trafficStreamOverhead = messageAndOverheadBytesLeft - byteBuffer.capacity();

// Ensure that space for at least one data byte and overhead exists, otherwise a flush is necessary.
Expand All @@ -292,7 +305,7 @@ private void addDataMessage(int captureFieldNumber, int dataFieldNumber, Instant
return;
}

while(byteBuffer.position() < byteBuffer.limit()) {
while (byteBuffer.position() < byteBuffer.limit()) {
int availableCOSSpace = getOrCreateCodedOutputStream().spaceLeft();
int chunkBytes = messageAndOverheadBytesLeft > availableCOSSpace ? availableCOSSpace - trafficStreamOverhead : byteBuffer.limit() - byteBuffer.position();
ByteBuffer bb = byteBuffer.slice();
Expand All @@ -313,7 +326,7 @@ private void addDataMessage(int captureFieldNumber, int dataFieldNumber, Instant
}

private void addSubstreamMessage(int captureFieldNumber, int dataFieldNumber, int dataCountFieldNumber, int dataCount,
Instant timestamp, java.nio.ByteBuffer byteBuffer) throws IOException {
Instant timestamp, java.nio.ByteBuffer byteBuffer) throws IOException {
int dataSize = 0;
int segmentCountSize = 0;
int captureClosureLength = 1;
Expand Down Expand Up @@ -345,7 +358,7 @@ private void addSubstreamMessage(int captureFieldNumber, int dataFieldNumber, in
}

private void addSubstreamMessage(int captureFieldNumber, int dataFieldNumber, Instant timestamp,
java.nio.ByteBuffer byteBuffer) throws IOException {
java.nio.ByteBuffer byteBuffer) throws IOException {
addSubstreamMessage(captureFieldNumber, dataFieldNumber, 0, 0, timestamp, byteBuffer);
}

Expand Down Expand Up @@ -452,8 +465,8 @@ private void observationSizeSanityCheck(int minExpectedSpaceAfterObservation, in
int actualRemainingSpace = getOrCreateCodedOutputStream().spaceLeft();
if (actualRemainingSpace < minExpectedSpaceAfterObservation || minExpectedSpaceAfterObservation < 0) {
log.warn("Writing a substream (capture type: {}) for Traffic Stream: {} left {} bytes in the CodedOutputStream but we calculated " +
"at least {} bytes remaining, this should be investigated", fieldNumber, connectionIdString + "." + (numFlushesSoFar + 1),
actualRemainingSpace, minExpectedSpaceAfterObservation);
"at least {} bytes remaining, this should be investigated", fieldNumber, connectionIdString + "." + (numFlushesSoFar + 1),
actualRemainingSpace, minExpectedSpaceAfterObservation);
}
}
}
Loading
Loading