Skip to content

Commit

Permalink
[issue-708] Add Protobuf Support (#723)
Browse files Browse the repository at this point in the history
  • Loading branch information
Welkin-Y authored Oct 18, 2023
1 parent faca104 commit 02fce28
Show file tree
Hide file tree
Showing 5 changed files with 300 additions and 29 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ dependencies {

compileOnly group: 'org.apache.flink', name: 'flink-json', version: flinkVersion
compileOnly group: 'org.apache.flink', name: 'flink-avro', version: flinkVersion
compileOnly group: 'org.apache.flink', name: 'flink-protobuf', version: flinkVersion

testImplementation (group: 'io.pravega', name: 'pravega-standalone', version: pravegaVersion) {
exclude group: 'org.slf4j', module: 'slf4j-api'
Expand All @@ -146,6 +147,7 @@ dependencies {
testImplementation group: 'org.apache.flink', name: 'flink-table-planner_' + flinkScalaVersion, classifier: 'tests', version: flinkVersion
testImplementation group: 'org.apache.flink', name: 'flink-json', version: flinkVersion
testImplementation group: 'org.apache.flink', name: 'flink-avro', version: flinkVersion
testImplementation group: 'org.apache.flink', name: 'flink-protobuf', version: flinkVersion
testImplementation group: 'org.hamcrest', name: 'hamcrest', version: hamcrestVersion
testImplementation group: 'org.testcontainers', name: 'testcontainers', version: testcontainersVersion
testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter', version: junit5Version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

import io.pravega.client.stream.Serializer;
import io.pravega.connectors.flink.PravegaConfig;
import io.pravega.connectors.flink.util.MessageToRowConverter;
import io.pravega.connectors.flink.util.SchemaRegistryUtils;
import io.pravega.schemaregistry.client.SchemaRegistryClient;
import io.pravega.schemaregistry.client.SchemaRegistryClientConfig;
import io.pravega.schemaregistry.client.SchemaRegistryClientFactory;
import io.pravega.schemaregistry.contract.data.SchemaInfo;
import io.pravega.schemaregistry.contract.data.SerializationFormat;
import io.pravega.schemaregistry.serializer.avro.schemas.AvroSchema;
import io.pravega.schemaregistry.serializer.protobuf.schemas.ProtobufSchema;
import io.pravega.schemaregistry.serializer.shared.impl.AbstractDeserializer;
import io.pravega.schemaregistry.serializer.shared.impl.EncodingCache;
import io.pravega.schemaregistry.serializer.shared.impl.SerializerConfig;
Expand All @@ -36,13 +38,15 @@
import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonToRowDataConverters;
import org.apache.flink.formats.protobuf.PbFormatConfig.PbFormatConfigBuilder;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
import com.google.protobuf.GeneratedMessageV3;

import javax.annotation.Nullable;
import java.io.IOException;
Expand All @@ -51,12 +55,17 @@
import java.util.Objects;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Deserialization schema from Pravega Schema Registry to Flink Table/SQL internal data structure {@link RowData}.
* Deserialization schema from Pravega Schema Registry to Flink Table/SQL
* internal data structure {@link RowData}.
*
* <p>Deserializes a <code>byte[]</code> message as a Pravega Schema Registry and reads the specified fields.
* <p>
* Deserializes a <code>byte[]</code> message as a Pravega Schema Registry and
* reads the specified fields.
*
* <p>Failures during deserialization are forwarded as wrapped IOExceptions.
* <p>
* Failures during deserialization are forwarded as wrapped IOExceptions.
*/
public class PravegaRegistryRowDataDeserializationSchema implements DeserializationSchema<RowData> {
private static final long serialVersionUID = 1L;
Expand Down Expand Up @@ -103,21 +112,33 @@ public class PravegaRegistryRowDataDeserializationSchema implements Deserializat
/** Flag indicating whether to fail if a field is missing. */
private final boolean failOnMissingField;

/** Flag indicating whether to ignore invalid fields/rows (default: throw an exception). */
/**
* Flag indicating whether to ignore invalid fields/rows (default: throw an
* exception).
*/
private final boolean ignoreParseErrors;

/** Timestamp format specification which is used to parse timestamp. */
private final TimestampFormat timestampFormat;

// --------------------------------------------------------------------------------------------
// Protobuf fields
// --------------------------------------------------------------------------------------------

/** Protobuf serialization schema. */
private transient ProtobufSchema pbSchema;

/** Protobuf Message Class generated from static .proto file. */
private GeneratedMessageV3 pbMessage;

public PravegaRegistryRowDataDeserializationSchema(
RowType rowType,
TypeInformation<RowData> typeInfo,
String groupId,
PravegaConfig pravegaConfig,
boolean failOnMissingField,
boolean ignoreParseErrors,
TimestampFormat timestampFormat
) {
TimestampFormat timestampFormat) {
if (ignoreParseErrors && failOnMissingField) {
throw new IllegalArgumentException(
"JSON format doesn't support failOnMissingField and ignoreParseErrors are both enabled.");
Expand All @@ -135,8 +156,8 @@ public PravegaRegistryRowDataDeserializationSchema(
@SuppressWarnings("unchecked")
@Override
public void open(InitializationContext context) throws Exception {
SchemaRegistryClientConfig schemaRegistryClientConfig =
SchemaRegistryUtils.getSchemaRegistryClientConfig(pravegaConfig);
SchemaRegistryClientConfig schemaRegistryClientConfig = SchemaRegistryUtils
.getSchemaRegistryClientConfig(pravegaConfig);
SchemaRegistryClient schemaRegistryClient = SchemaRegistryClientFactory.withNamespace(namespace,
schemaRegistryClientConfig);
SerializerConfig config = SerializerConfig.builder()
Expand All @@ -153,8 +174,7 @@ public void open(InitializationContext context) throws Exception {
break;
case Json:
ObjectMapper objectMapper = new ObjectMapper();
boolean hasDecimalType =
LogicalTypeChecks.hasNested(rowType, t -> t instanceof DecimalType);
boolean hasDecimalType = LogicalTypeChecks.hasNested(rowType, t -> t instanceof DecimalType);
if (hasDecimalType) {
objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
}
Expand All @@ -166,6 +186,10 @@ public void open(InitializationContext context) throws Exception {
config.isWriteEncodingHeader(),
objectMapper);
break;
case Protobuf:
pbSchema = ProtobufSchema.of(pbMessage.getClass());
deserializer = SerializerFactory.protobufDeserializer(config, pbSchema);
break;
default:
throw new NotImplementedException("Not supporting deserialization format");
}
Expand All @@ -190,20 +214,26 @@ public Object deserializeToObject(byte[] message) {
return deserializer.deserialize(ByteBuffer.wrap(message));
}

public RowData convertToRowData(Object message) {
public RowData convertToRowData(Object message) throws Exception {
Object o;
switch (serializationFormat) {
case Avro:
AvroToRowDataConverters.AvroToRowDataConverter avroConverter =
AvroToRowDataConverters.createRowConverter(rowType);
AvroToRowDataConverters.AvroToRowDataConverter avroConverter = AvroToRowDataConverters
.createRowConverter(rowType);
o = avroConverter.convert(message);
break;
case Json:
JsonToRowDataConverters.JsonToRowDataConverter jsonConverter =
new JsonToRowDataConverters(failOnMissingField, ignoreParseErrors, timestampFormat)
.createConverter(checkNotNull(rowType));
JsonToRowDataConverters.JsonToRowDataConverter jsonConverter = new JsonToRowDataConverters(
failOnMissingField, ignoreParseErrors, timestampFormat)
.createConverter(checkNotNull(rowType));
o = jsonConverter.convert((JsonNode) message);
break;
case Protobuf:
PbFormatConfigBuilder pbConfigBuilder = new PbFormatConfigBuilder()
.messageClassName(pbMessage.getClass().getName());
MessageToRowConverter pbMessageConverter = new MessageToRowConverter(rowType, pbConfigBuilder.build());
o = pbMessageConverter.convertMessageToRow(message);
break;
default:
throw new NotImplementedException("Not supporting deserialization format");
}
Expand All @@ -214,16 +244,16 @@ private static class FlinkJsonGenericDeserializer extends AbstractDeserializer<J
private final ObjectMapper objectMapper;

public FlinkJsonGenericDeserializer(String groupId, SchemaRegistryClient client,
SerializerConfig.Decoders decoders, EncodingCache encodingCache,
boolean encodeHeader, ObjectMapper objectMapper) {
SerializerConfig.Decoders decoders, EncodingCache encodingCache,
boolean encodeHeader, ObjectMapper objectMapper) {
super(groupId, client, null, false, decoders, encodingCache, encodeHeader);
this.objectMapper = objectMapper;
}

@Override
public final JsonNode deserialize(InputStream inputStream,
SchemaInfo writerSchemaInfo,
SchemaInfo readerSchemaInfo) throws IOException {
SchemaInfo writerSchemaInfo,
SchemaInfo readerSchemaInfo) throws IOException {
return objectMapper.readTree(inputStream);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.pravega.client.stream.Serializer;
import io.pravega.connectors.flink.PravegaConfig;
import io.pravega.connectors.flink.table.catalog.pravega.util.PravegaSchemaUtils;
import io.pravega.connectors.flink.util.RowToMessageConverter;
import io.pravega.connectors.flink.util.SchemaRegistryUtils;
import io.pravega.schemaregistry.client.SchemaRegistryClient;
import io.pravega.schemaregistry.client.SchemaRegistryClientConfig;
Expand All @@ -27,6 +28,7 @@
import io.pravega.schemaregistry.contract.data.SerializationFormat;
import io.pravega.schemaregistry.serializer.avro.schemas.AvroSchema;
import io.pravega.schemaregistry.serializer.json.schemas.JSONSchema;
import io.pravega.schemaregistry.serializer.protobuf.schemas.ProtobufSchema;
import io.pravega.schemaregistry.serializer.shared.codec.Encoder;
import io.pravega.schemaregistry.serializer.shared.impl.AbstractSerializer;
import io.pravega.schemaregistry.serializer.shared.impl.SerializerConfig;
Expand All @@ -41,24 +43,33 @@
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonFormatOptions;
import org.apache.flink.formats.json.RowDataToJsonConverters;
import org.apache.flink.formats.protobuf.PbCodegenException;
import org.apache.flink.formats.protobuf.PbFormatConfig.PbFormatConfigBuilder;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;

import com.google.protobuf.AbstractMessage;
import com.google.protobuf.GeneratedMessageV3;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Objects;

/**
* Serialization schema that serializes an object of Flink internal data structure {@link RowData} into
* Serialization schema that serializes an object of Flink internal data
* structure {@link RowData} into
* Pravega Schema Registry bytes.
*
* <p>Serializes the input Flink object into GenericRecord and converts it into <code>byte[]</code>.
* <p>
* Serializes the input Flink object into GenericRecord and converts it into
* <code>byte[]</code>.
*
* <p>Result <code>byte[]</code> messages can be deserialized using {@link
* <p>
* Result <code>byte[]</code> messages can be deserialized using {@link
* PravegaRegistryRowDataDeserializationSchema}.
*/
public class PravegaRegistryRowDataSerializationSchema implements SerializationSchema<RowData> {
Expand Down Expand Up @@ -113,6 +124,16 @@ public class PravegaRegistryRowDataSerializationSchema implements SerializationS
/** Flag indicating whether to serialize all decimals as plain numbers. */
private final boolean encodeDecimalAsPlainNumber;

// --------------------------------------------------------------------------------------------
// Protobuf fields
// --------------------------------------------------------------------------------------------

/** Protobuf serialization schema. */
private transient ProtobufSchema pbSchema;

/** Protobuf Message Class generated from static .proto file. */
private GeneratedMessageV3 pbMessage;

public PravegaRegistryRowDataSerializationSchema(
RowType rowType,
String groupId,
Expand All @@ -137,8 +158,8 @@ public PravegaRegistryRowDataSerializationSchema(
@SuppressWarnings("unchecked")
@Override
public void open(InitializationContext context) throws Exception {
SchemaRegistryClientConfig schemaRegistryClientConfig =
SchemaRegistryUtils.getSchemaRegistryClientConfig(pravegaConfig);
SchemaRegistryClientConfig schemaRegistryClientConfig = SchemaRegistryUtils
.getSchemaRegistryClientConfig(pravegaConfig);
SchemaRegistryClient schemaRegistryClient = SchemaRegistryClientFactory.withNamespace(namespace,
schemaRegistryClientConfig);
SerializerConfig config = SerializerConfig.builder()
Expand All @@ -162,6 +183,10 @@ public void open(InitializationContext context) throws Exception {
config.isRegisterSchema(),
config.isWriteEncodingHeader());
break;
case Protobuf:
pbSchema = ProtobufSchema.of(pbMessage.getClass());
serializer = SerializerFactory.protobufSerializer(config, pbSchema);
break;
default:
throw new NotImplementedException("Not supporting deserialization format");
}
Expand All @@ -176,6 +201,8 @@ public byte[] serialize(RowData row) {
return convertToByteArray(serializeToGenericRecord(row));
case Json:
return convertToByteArray(serializaToJsonNode(row));
case Protobuf:
return convertToByteArray(serializeToMessage(row));
default:
throw new NotImplementedException("Not supporting deserialization format");
}
Expand All @@ -185,8 +212,8 @@ public byte[] serialize(RowData row) {
}

public GenericRecord serializeToGenericRecord(RowData row) {
RowDataToAvroConverters.RowDataToAvroConverter runtimeConverter =
RowDataToAvroConverters.createConverter(rowType);
RowDataToAvroConverters.RowDataToAvroConverter runtimeConverter = RowDataToAvroConverters
.createConverter(rowType);
return (GenericRecord) runtimeConverter.convert(avroSchema, row);
}

Expand All @@ -200,6 +227,13 @@ public JsonNode serializaToJsonNode(RowData row) {
return runtimeConverter.convert(mapper, node, row);
}

public AbstractMessage serializeToMessage(RowData row) throws Exception {
PbFormatConfigBuilder pbConfigBuilder = new PbFormatConfigBuilder()
.messageClassName(pbMessage.getClass().getName());
RowToMessageConverter runtimeConverter = new RowToMessageConverter(rowType, pbConfigBuilder.build());
return runtimeConverter.convertRowToProtoMessage(row);
}

@SuppressWarnings("unchecked")
public byte[] convertToByteArray(Object message) {
return serializer.serialize(message).array();
Expand All @@ -208,14 +242,16 @@ public byte[] convertToByteArray(Object message) {
@VisibleForTesting
protected static class FlinkJsonSerializer extends AbstractSerializer<JsonNode> {
private final ObjectMapper objectMapper;

public FlinkJsonSerializer(String groupId, SchemaRegistryClient client, JSONSchema schema,
Encoder encoder, boolean registerSchema, boolean encodeHeader) {
Encoder encoder, boolean registerSchema, boolean encodeHeader) {
super(groupId, client, schema, encoder, registerSchema, encodeHeader);
objectMapper = new ObjectMapper();
}

@Override
protected void serialize(JsonNode jsonNode, SchemaInfo schemaInfo, OutputStream outputStream) throws IOException {
protected void serialize(JsonNode jsonNode, SchemaInfo schemaInfo, OutputStream outputStream)
throws IOException {
objectMapper.writeValue(outputStream, jsonNode);
outputStream.flush();
}
Expand Down
Loading

0 comments on commit 02fce28

Please sign in to comment.