Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancements in StreamChannelConnectionCaptureSerializer #494

Merged
merged 6 commits into from
Feb 2, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,24 @@
import java.util.concurrent.CompletableFuture;

/**
* At a basic level, this class aims to be a generic serializer which can receive ByteBuffer data and serialize the data
* into the defined Protobuf format {@link org.opensearch.migrations.trafficcapture.protos.TrafficStream}, and then write
* this formatted data to the provided CodedOutputStream.
*
* Commented throughout the class are example markers such as (e.g. 1: "1234ABCD") which line up with the textual
* representation of this Protobuf format to be used as a guide as fields are written. An example TrafficStream can
* also be visualized below for reference.
*
* 1: "91ba4f3a-0b34-11ee-be56-0242ac120002"
* This class serves as a generic serializer. Its primary function is to take ByteBuffer data,
* serialize it into the Protobuf format as defined by
* {@link org.opensearch.migrations.trafficcapture.protos.TrafficStream}, and then output
* the formatted data to a given CodedOutputStream.
* <p>
* Within the class, example markers are commented (e.g., 1: "9a25a4fffe620014-00034cfa-00000001-d208faac76346d02-864e38e2").
* These markers correspond to the textual representation of the Protobuf format and serve as a guide
* for field serialization. Below is a visual representation of an example `TrafficStream` for further reference:
* <pre>{@code
* 1: "9a25a4fffe620014-00034cfa-00000001-d208faac76346d02-864e38e2"
* 5: "5ae27fca-0ac4-11ee-be56-0242ac120002"
* 2 {
* 1 {
* 1: 1683655127
* 2: 682312000
* }
* 4 {
* 1: "POST /test-index/_bulk?pretty…
* 1: "POST /test-index/_bulk?pretty…"
* }
* }
* 2 {
Expand All @@ -56,11 +57,14 @@
* }
* }
* 3: 1
* }
* </pre>
*/
@Slf4j
public class StreamChannelConnectionCaptureSerializer<T> implements IChannelConnectionCaptureSerializer<T> {

private static final int MAX_ID_SIZE = 96;
// 100 is the default size of netty connectionId and kafka nodeId along with serializationTags
private static final int MAX_ID_SIZE = 100;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gregschohn, let me know if we should increase this as 100 is exactly what we expect out of our realIds

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion. If I were doing this code from scratch, I'd probably put an assertion that I could create a new CodedOutputStream from the factory that was able to write the boilerplate stuff - all within a lambda inside the assert statement.


private boolean readObservationsAreWaitingForEom;
private int eomsSoFar;
Expand Down Expand Up @@ -106,7 +110,7 @@ private CodedOutputStream getOrCreateCodedOutputStream() throws IOException {
} else {
currentCodedOutputStreamHolderOrNull = streamManager.createStream();
var currentCodedOutputStream = currentCodedOutputStreamHolderOrNull.getOutputStream();
// e.g. 1: "1234ABCD"
// e.g. 1: "9a25a4fffe620014-00034cfa-00000001-d208faac76346d02-864e38e2"
currentCodedOutputStream.writeString(TrafficStream.CONNECTIONID_FIELD_NUMBER, connectionIdString);
if (nodeIdString != null) {
// e.g. 5: "5ae27fca-0ac4-11ee-be56-0242ac120002"
Expand Down
Loading
Loading