Skip to content

PDP 1: Schema Registry

shivesh ranjan edited this page Apr 28, 2020 · 3 revisions

Motivation

Pravega streams store sequences of bytes and pravega does not validate the information in these events. However, applications typically require encoding this information in a structured format. And as the business needs change, this structure may also need to change/evolve to handle new requirements.

To handle this, applications would want to use serialization systems that support schemas and allow for evolution of schemas. With schemas, they can define the structure of the data in an event while ensuring that both upstream and downstream applications use the correct structure. And over time, evolve this schema to incorporate new business requirements. Without a schema evolution support, if the upstream writers were updated to publish newer structures of data, it may break downstream readers if they were running with an older version of the data.

When schemas are allowed to change in compatible fashion then downstream applications continue to consume data without worrying about breaking because of unexpected changes to structures.

In the absence of schema management support from streaming storage layer, applications have to build an out of band coordination mechanism for potentially exchanging and evolving schemas for the data in the stream. Given that schema management is a common requirement of applications working with streams, it is desirable to perform it at the Streaming Storage layer.

With schema management supported by storage layer, all applications relying on the streaming storage can benefit from it. A common schema management can additionally enforce schema evolution strategies and prevent events with incompatible schemas to be written by the upstream applications, consequently simplifying the application development for stream processing and allowing for reduced coordination across teams.

Schema Management and Application Use Cases:

Let us take time to formally define a few concepts that will be meaningful while talking about schema management. Schema: declaration of the structure of the data in a formal language.

Schema Evolution: Schema evolution is simply changing the schema used in a stream. Serialization systems that support schema evolution allow using different schemas during writes and reads.

Compatibility: Two schemas are said to be compatible if data written using one schema can be read using another schema. Schema Version: As schemas are evolved for a message type it is captured as a chronological history of schemas. Each schema is uniquely identified by a monotonically increasing version number. Backward Compatibility: For two schemas S1 and S2 where S1 had lower version than S2, S1 is backward compatible with S2 if data written using S1 can be read using S2. Forward Compatibility: For two schemas S1 and S2 where S1 had lower version than S2, S1 is forward compatible with S2 if data written using S2 can be read using S1.

Serialization systems that allow schema evolution do not inherently attach a version to the schema. A version is logical property we apply to a schema when it is part of an ordered group of schemas describing an entity in its evolution.

Compatibility by definition is ability to read data using a different schema than the one that was used to write it. The direction of compatibility is applicable in the context of an order of evolution. There is no universal compatibility rules for all serialization formats. For example, the rules for compatibility for avro will be different from rules for compatibility for protobuf. However, compatibility and evolution are generic concepts that are applicable to all serialization systems that require a schema and support schema evolution.

Our objective is to build a schema management and serving layer that does not tie itself to any single serialization system. Any subsequent discussion about these concepts should be taken with that view.

Schema Management using a central service

For schema management we propose to build a centralized registry service, called schema registry service, that is single source of truth about schemas of data stored in a pravega stream. It will group schemas for the data in a stream, and apply an order and enforce compatibility checks as schemas are evolved.

The service will provide logical groups where different schemas for data in a stream will be stored. The schema group logically maps to a pravega stream but its usage is not limited to only pravega streams. A group is a name under which schemas are stored. The registry service maintains a versioned history of schemas for a group and assigns unique version numbers to these schemas. It will assign versions to each schema in the group to uniquely identify the schema and its order in the group. The registry service will provide an interface, a REST endpoint, for storing and retrieving schemas for each group.

Schema evolution for the group will will be subjected to compatibility checks based on chosen strategy. The strategy could be any of forward compatibility, backward compatibility, both or none. The service would accept only those schemas in a group that comply with the chosen compatibility strategy. The accepted schemas will become part of a versioned history and will be assigned unique reference identifiers called schema versions.

So far, we have described a way to store, evolve and access schemas. These can be accessed by anyone interested in knowing the structure of data in the group. Since the data in a stream can evolve, there could be different events written with different schemas or structures. Applications could benefit if there was a way to tag the data written into a stream with the schema with which it was written.

For this, the registry service will be complimented with a registry client. This registry client can be used to encode schema information with the payload by referencing the schema with the schema version. During write phase, the registry client will encode writer schema version along with the payload. During read phase, the registry client will resolve schema version to actual schema by querying the registry service. Including schema versions instead of actual schemas has advantages of reduced payload size while accruing benefits of sharing schema with downstream applications.

As mentioned earlier, the benefits sharing schema with downstream applications is not limited to formats like avro where reader requires the writer schema for deserialization. Enabling applications with the knowledge of the exact schema with which the data was written for all serialization formats can greatly simplify working with and building streaming data pipelines.

Using Pravega With Schema Registry

Pravega will continue to be decoupled from serialization format and only deal with raw bytes. Schema registry is a new complimentary service which can be deployed to benefit applications working with data in pravega streams. The management of schema via schema registry is optional and applications can continue to use pravega streams without using schema management provided by a central service.

Users can choose to use pravega any of the three following modes:

  1. Existing way: no schema registry.
  2. Register and evolve schemas in registry but do not use registry client while writing data
  3. Register schemas in the registry and use registry client to encode schema Id with payload

Because pravega stream can work in any of the three modes, the applications have the flexibility to choose the level of support it wants from the registry.

Different types of application use cases can benefit from different level of integration with the registry. We will try to classify applications and describe how they could benefit from the registry.

Applications

Generally, applications are authored to include a specific schema (or corresponding generated classes) in the code. Reader applications read the data using schema on read by applying specific schema on the serialized data. They would typically work with generated objects and interpret the data from the stream as specific structured objects. With compatible evolution of schemas, these applications can continue to work even as they encounter messages encoded with different schemas. There are another class of applications that are authored without any specific schema (or corresponding generated classes) and can work by knowing the schema at the time of data processing. Examples of such applications include connectors, sql etc. There is advantage in sharing the strucuture of the data along with the data so that such applications can be enabled.

To broadly understand the use case, we will classify applications into two broad categories.

  1. Applications with implicit knowledge of structure of data they work with by having the schema embedded in the code. These are your typical applications that are compiled with specific schemas (or corresponding generated classes) and their business logic is tightly coupled with a specific structure in which they want to process the data.
  2. Applications that do include specific schemas at compile time. They dynamically react to data they need to process. Examples of such applications would include SQL query application over streaming data. The application may not be hard coded with a schema but will request the registry service for a compatible schema that it can use to write and read data into the stream using SQL commands.

The first category of applications can benefit from sharing of schema via the registry service for formats like avro where the schema is required for deserialization. Other serialization formats do not necessarily benefit from sharing the exact writer schema. However, they can still benefit from storing and using the registry service to evolve schemas in a compatible fashion so that downstream applications can continue to work without worrying about incompatibility. This also allows downstream and upstream applications to be highly decoupled translating to lesser coordination between different teams and this faster development cycles and higher productivity.

The second category of applications benefit greatly from the presence of a central store for schemas that describe the data within a stream. Depending on applications, some could fetch a specific schema from the registry service and use schema on read technique; while others may want to fetch exact writer schema for each data that they process. These applications can process data in a stream by applying the structure described in the schema on the data that they read at the runtime by dynamically assessing the schema corresponding to the event. This structured data can then be used to perform any meaningful business processing of the data.

For streams where they may want or require the writer schema, they could use the registry client which would encode the schema version with the serialized data. But others that may not be interested in the exact structure, they could still use the registry service without using the registry client.

A schema registry service and client combine to enable all such applications by facilitating making write time schemas available with downstream applications and with enforcement of compatibility during schema evolution guarantees that these applications will not break even as schemas are evolved.

Let us take a concrete example application – imagine an SQL application that provides tabular views of data in pravega streams which is configured with pravega cluster and registry. They would want the view data in the stream in a tabular form, thus assume a fixed structure for all events as they are deserialized.

Users could simply express their SQL queries as follows.

CREATE STREAM sensorData WITH (PRAVEGA-STREAM='scope/sensorDataStream'); 
SELECT * FROM sensorData;

The application would contact the registry service to retrieve a registered (typically latest) schema and using its structure, it would apply schema on read on all events in the stream to give tabular views could of the streaming data. Take another example, this time imagine an indexing application that wants to index each event in the stream individually by using the exact structure of the data used for writing it. While indexing event, this search application would retrieve the write time schema from the registry and apply that structure on the event and create a document with all the fields in the event and index it with a search engine.

Such applications are made possible by managing schemas at a central location in the registry service. Without the central schema management and serving layer, the schemas would have to be shared out of band or in ad hoc manner and each application developer would invent their own mechanisms.

Schema Registry Service Design:

Applications will create a group and choose a compatibility policy with the Schema_Registry_Service. Compatibility policy tells the registry service which kind of changes to the schema are allowed and which are disallowed.

Pravega applications that use schema registry will instantiate pravega writers and readers with schema registry client. This can be instantiated in two modes – 1. Encode schema version with data 2. Do not encode schema version with data. Schema registry client will take care of all communication with the registry service. When an application chooses a schema to work with, a call to the registry service would be made that would either implicitly register the client's schema or retrieve the version if the said schema was already registered.

The registry service will offer multiple types of compatibility like backward, forward, bi-directional, disabled, and allow-all. Based on the chosen compatibility setting, new schemas will be checked against potentially all existing schemas for compatibility.

It is important to call out that these compatibility settings are not serialization format specific. Hence, schema registry is designed to be generic and extendible and be able to support multiple serialization systems.

Typically, users would prefer a serialization system that supports describing the structure of data in a formal language and allows them to evolve the schema. The popular choices are Avro, Protobuf, Thrift. The new schema registry will aim to provide compatibility validation for popular serialization formats, while at the same time, it will be designed to be plugged with custom schema validation code for any custom serialization system too.

Design Goals:

  • Provide a centralized service for storing and retrieving schemas.
  • Assign and store versioned history for schemas for a stream.
  • Allow evolution of schema and accept or reject new schemas based on chosen compatibility settings by validating against existing versions.
  • Support multiple schema types like Avro, Protobuf, Json, thrift and even custom schema formats such that Schema Registry is fully pluggable.
  • Allow each schema owners to choose compatibility strategy.
  • Modify pravega clients to be instantiated with specific schemas.
  • Provide Serializers that handle schema storage and retrieval from the registry. They also include schema version information along with the serialized data.

Non-goals:

  • Upgrade of applications in accordance with compatibility policy should be done out of band. Schema registry does not enforce correctness if upgrade order for the policy is not followed.

Architecture

Schema registry should be distributed and backed by durably persisting its metadata in pravega. All schema registry service nodes will be stateless and capable of serving all queries. So there is no partition or distribution of groups across schema registry service nodes. All the state is durably and consistently persisted in pravega tables.

The registry service is a standalone service that acts as a serving and management layer for schemas. Schemas are grouped under a unique name with the service and evolved within that namespace. Schemas are assigned unique identifiers within that group that applications can use to refer to a schema uniquely. The service merely acts as a central registry for this information and does not impose any constraints or criteria on how its information is to be used by the applications.

Using Schema registry in Pravega applications

Pravega applications can use schema registry service to store and retrieve the schemas for the data that they are working with during runtime. For this, applications would register schemas with the registry service and then tag their data with the unique schema identifiers generated by the service. The writer applications will add the identifier as a header with the data while writing serialized events into pravega. Reader applications will parse the header to read the encoding identifier and resolve it with the schema registry service and use the information from the service as the structure describing the data that follows it.

One of the important goals for the service is to ensure that schema layer is completely decoupled from Pravega. Pravega is a streaming storage system and it should continue to be schema less and work with raw bytes.

This follows naturally in to ensure that pravega streams could be used both with or without schemas. So it entails that pravega applications could use pravega in any of the three modes:

  1. No schema management using registry service – This is same as existing way and schema registry is not mandated.
  2. Use schema registry to store schemas but do not encode schema version identifiers with the stream data. Typical use case would be with serialization systems that do not require writer schema at the time of deserialization and applications that can work with schema on read technique with any compatible schema. Such applications can accrue benefits of central management of schema while they do not wish to include schema or its reference with the payload in the stream.
  3. Schema management with Schema registry with registry client encoding schema identifier with serialized payload for downstream applications to access writer schemas should they wish to. This is absolutely essential for formats like avro. This is also useful for other formats where downstream application are interested in exact writer schema.

Pravega reader and writer clients require users to supply an implementation of Serializer interface. We provide applications a suite of schema registry aware serializers. These serializers are responsible for all interactions with schema registry service.

This approach makes all interaction with schema registry. We will provide implementation for popular serialization formats like avro protobuf etc. It will also provide a pluggablility for support users can provide implementation for custom formats.

We will next describe the interaction between schema registry client and service for the use case 3. During writer/reader instantiation, PravegaSerializers will automatically contact schema registry with the supplied schema and schema registry with either register the initial schema or evolve the schema if it does not exist.

If the supplied schema already exists in the registry, then the schema registry performs compatibility validation with the latest schema before accepting or rejecting the schema.

Events written using a writer with schema will include schema version with serialized data. This will enable downstream readers to retrieve the schema and use it for deserialization.

Pravega Serializers:

We have created a whole suite of Pravega Serializer for different serialization formats which will transparently interact with the registry service and encode headers describing encoding of the data that follows it. Pravega Deserializers will also maintain a cache of retrieved schemas with their encoding information. Typically writers will only contact the registry service once, during the initialization of the serializer to get the unique encoding identifier. And readers will communicate with schema registry once for each new encoding id that they encounter. Once they retrieve the information corresponding to the encoding id, they will cache it and reuse it from the cache.

We will use following structure for encoding schema id with each serialized event. It will take up additional bytes per event. First byte will be the magic byte that describes the encoding format. Subsequent bytes will be used to encode the encoding id.

|       1 byte        |     2-4 byte     | <serialized event's length>  |
|    Encoding format  |  Encoding Id     |       Serialized Data        |

Consequently, as clients would deserialize the event using Pravega's Serializers, the serializer would first read the schema version and retrieve the schema from the Schema Registry and cache it. So readers will only call into Schema Registry everytime they encounter a new schema id.

Example usage for avro:

        Serializer<MyEvent> serializer = SerializerFactory.avroSerializer(serializerConfig, AvroSchema.of(MyEvent.class));
        Serializer<MyEvent> deserializer = SerializerFactory.avroDeserializer(serializerConfig, AvroSchema.of(MyEvent.class));
        Serializer<GenericRecord> genericDeserializer = SerializerFactory.avroGenericDeserializer(serializerConfig);

Writer and reader instantiation:

        EventStreamWriter<MyEvent> writer = clientFactory.createEventWriter(stream, serializer,
                EventWriterConfig.builder().build());
        EventStreamReader<MyEvent> reader = clientFactory.createReader("reader",
                readergroup,
                deserializer,
                ReaderConfig.builder().build());

Schema Registry will support multiple types of serialization systems. The schema is encoded as bytes and sent to registry service for persistence. For known schema formats, the registry will parse the schema and apply schema validation rules. For custom schema type, the user needs to provide the schema in binary form. The custom schema format is not parsed and interpreted by schema registry or its client.

Concepts:

Group

A group refers to the name under which the schemas are registered with the service. A group is identified by a unique group id. A group is the abstraction under which schemas are registered and evolved and is the chief primitive exposed by the registry service. A pravega stream maps to a group and its corresponding group name is derived from the stream's fully qualified name. The information stored by the service for a group includes group metadata like SchemaType, Compatibility policy. Codecs and Schemas are registered under the group and schema registry service applies compatibility policy for all schemas registered in a group.

Schemas

Schemas are encapsulated under an object called SchemaInfo will capture all metadata for the schema. SchemaInfo will have a name for the schema, a schema type and schema data in binary form. It will also include a key/value properties map which will hold user defined keys and values and can be used for tagging the schema.

Version

Schema registry stores schemas under a named group. Within a group, schemas are assigned versions to capture how schemas are evolved within the group. Version is encapulated under an object called VersionInfo. VerionInfo contains two fields - version, which is a 32 bit counter, and schema name. The version is used to uniquely identify a schema within a versioned history in a group. So all schemas within a stream will be uniquely identified by a unique version.

Codecs

Encoding of data in streams can include more than just schemas. Schema registry has support for registering codecs for a group. Codecs describe how data is encoded after serializing it. Typical usage of encoding would be to describe the compression type for the data. CodecType defines different types of encodings that can be used for encoding data while writing it to the stream.

Encoding

A codec type and schema version combination uniquely identifies encoding for the data. This is encapsulated under an object called EncodingInfo. Schema registry generates unique identifiers for each such pair of registered schemas and codecs. EncodingId is a 32 bit number. After a schema is registered, whenever its version is requested with a codec type, the encoding id that uniquely identifies that pair is returned. Pravega applications that use schema registry aware serializers

Support for multiple types of schemas within the same group:

Streams are also used in scenarios where multiple types of objects could be written into the same stream. These typically include usage scenarios like event sourcing, message bus etc. The Schema registry service has first class support coexistence of multiple types of events within a group. Schemas for each of these different object types will be evolved independent of other schemas under the group. To support this, there is a group property for a group called validateBySchemaName. If this flag is set to true, schema registry service will compare any new schema with all existing schemas that share the same schema name. If validateBySchemaName is set to false (default) schemas are compared with all other schemas in the group.

Compatibility:

As part of schema management, the registry service supports validating schemas for compatibility.

Compatibility can be defined as group level policy. The policy is applied at the time when new schema is added to the group. Pravega schema registry supports a range of compatibility policies that can be specified.
Broadly they are :

  1. ALLOW_ANY: Allow all schema changes without any compatibility check. The responsibility to manage compatibility is left to the schema owner out of band.
  2. BACKWARD: Validate that new schema can be used to read data written using previous schema.
  3. BACKWARD_TILL(x): Validate that new schema is backward compatible till specified schema version.
  4. BACKWARD_TRANSITIVE: Validate that new schema is backward compatible with all previous schemas in the group.
  5. FORWARD: Validate that previous schema can be used to read data written using newer schema.
  6. FORWARD_TILL(x): Validate that new schema is backward compatible till specified schema version.
  7. FORWARD_TRANSITIVE: Validate that the new schema is forward compatible with all previous schemas in the group. 8: BACKWARD_TILL(x)_AND_FORWARD_TILL(y): Validate for backward compatibility till version identified by x and for forward compatibility till version identified by y.
  8. FULL: Validate against the latest schema that schema is compatible in both forward and backward direction with new schema. Old schema can be used to read data from new schema and new schema can be used to read data from old schema.
  9. FULL_TRANSITIVE: Validate against all previous schemas for both forward and backward compatibility.
  10. DENY_ALL: Disallow any schema evolution/modification.

Compatibility policy is encapsulated under a more generalized construct of SchemaValidationRules. Currently the only rule supported is Compatibility, but by encapsulating it under Rules object allows us to include additional types of rules in future.

How to choose compatibility policy:

The choice of compatibility policy is dependent on the scenario in which a stream is used. For example, backward compatibility ensures that data written with old schema readable by readers with new schema. So backward compatibility as a policy choice should be chosen when it is possible to upgrade readers first. The registry service validates schemas for compatibility, but the corresponding upgrades has to be done out of band. Take an analogy with Service client model. Schemas should be thought of as analogous to APIs. Backward compatibility choice ensures that when service is upgraded, it can continue to serve older clients. The service is upgraded followed by clients. Same concept applies to streams where you upgrade the readers followed by writers.

Forward compatibility, similarly, ensures that the data written with new schema can be read by readers with old schema. Here, you upgrade the writer first followed by readers.

If there is no control over which could be upgraded first, then Full compatibility should be chosen. If writers and readers from multiple versions can be present as schemas are evolved, or if historical reads are expected on the streaming data, policy with transitive property should be made accordingly.

So the choice of policy should be made very carefully by understanding the usage scenario for the stream and the level of control over writers or readers for upgrades.

APIs:

Following are the logical APIs exposed by the schema registry.

interface SchemaRegistry {
    /**
     * Adds a new group. A group refers to the name under which the schemas are registered. A group is identified by a 
     * unique name and has an associated set of group metadata {@link GroupProperties} and a list of codecs and a 
     * versioned history of schemas that were registered under the group. 
     * 
     * @param groupId Id for the group that uniquely identifies the group. 
     * @param schemaType Serialization format used to encode data in the group. 
     * @param validationRules Schema validation policy to apply for the group. 
     * @param validateByObjectType Property to describe whether group should have schema compatibility checks performed by object types. 
     *                            Object Types are uniquely identified by {@link SchemaInfo#name}. 
     * @param properties          Map of properties. Example properties could include flag to indicate whether client should
     *                            encode registry service generated encoding id with payload. 
     * @return True indicates if the group was added successfully, false if it exists. 
     */
    boolean addGroup(String groupId, SchemaType schemaType, SchemaValidationRules validationRules,
                     boolean validateByObjectType, Map<String, String> properties);
    
    /**
     * Api to remove group. 
     * 
     * @param groupId Id for the group that uniquely identifies the group. 
     */
    void removeGroup(String groupId);

    /**
     * List all groups. 
     * 
     * @return map of names of groups with corresponding group properties for all groups. 
     */
    Map<String, GroupProperties> listGroups();
    
    /**
     * Get group properties for the group. 
     * {@link GroupProperties#schemaType} which identifies the serialization format and schema type used to describe the schema.
     * {@link GroupProperties#schemaValidationRules} sets the schema validation policy that needs to be enforced for evolving schemas.
     * {@link GroupProperties#validateByObjectType} that specifies if schemas should be exclusively validated against 
     * schemas that have the same {@link SchemaInfo#name}. 
     * {@link GroupProperties#properties} describes generic properties for a group.
     * 
     * @param groupId Id for the group.
     * @return Group properties which includes property like Schema Type and compatibility policy. 
     */
    GroupProperties getGroupProperties(String groupId);

    /**
     * Update group's schema validation policy. 
     * 
     * @param groupId Id for the group. 
     * @param validationRules New compatibility setting for the group.
     */
    void updateSchemaValidationRules(String groupId, SchemaValidationRules validationRules);
    
    /**
     * Gets list of object types registered under the group. ObjectTypes are identified by {@link SchemaInfo#name}
     * 
     * @param groupId Id for the group. 
     * @return List of objectTypes within the group.   
     */
    List<String> getObjectTypes(String groupId);

    /**
     * Registers schema to the group. If group is configured with {@link GroupProperties#validateByObjectType} then 
     * the {@link SchemaInfo#name} is used for validating against existing group schemas that share the same name. 
     * 
     * @param groupId Id for the group. 
     * @param schema Schema to add. 
     *              
     * @return versionInfo which uniquely identifies where the schema is added in the group.   
     */
    VersionInfo addSchema(String groupId, SchemaInfo schema);

    /**
     * Gets schema corresponding to the version. 
     * 
     * @param groupId Id for the group. 
     * @param version Version which uniquely identifies schema within a group. 
     * @return Schema info corresponding to the version info. 
     */
    SchemaInfo getSchema(String groupId, VersionInfo version);

    /**
     * Gets encoding info against the requested encoding Id. 
     * Encoding Info uniquely identifies a combination of a schemaInfo and codecType. 
     * 
     * @param groupId Id for the group. 
     * @param encodingId Encoding id that uniquely identifies a schema within a group. 
     * @return Encoding info corresponding to the encoding id. 
     */
    EncodingInfo getEncodingInfo(String groupId, EncodingId encodingId);

    /**
     * Gets an encoding id that uniquely identifies a combination of Schema version and codec type. 
     * 
     * @param groupId Id for the group. 
     * @param version version of schema 
     * @param codecType codec type
     * @return Encoding id for the pair of version and codec type.
     */
    EncodingId getEncodingId(String groupId, VersionInfo version, CodecType codecType);

    /**
     * Gets latest schema and version for the group (or objectTypeName, if specified). 
     * For groups configured with {@link GroupProperties#validateByObjectType}, the objectTypeName name needs to be supplied to 
     * get the latest schema for the objectTypeName. 
     * 
     * @param groupId Id for the group. 
     * @param objectTypeName Name of objectTypeName. 
     *                 
     * @return Schema with version for the last schema that was added to the group (or objectTypeName).
     */
    SchemaWithVersion getLatestSchema(String groupId, @Nullable String objectTypeName);

    /**
     * Gets all schemas with corresponding versions for the group (or objectTypeName, if specified). 
     * For groups configured with {@link GroupProperties#validateByObjectType}, the objectTypeName name needs to be supplied to 
     * get the latest schema for the objectTypeName. {@link SchemaInfo#name} is used as the objectTypeName name. 
     * The order in the list matches the order in which schemas were evolved within the group. 
     * 
     * @param groupId Id for the group.
     * @param objectTypeName Name of objectTypeName. 
     * @return Ordered list of schemas with versions and validation rules for all schemas in the group. 
     */
    List<SchemaEvolution> getGroupEvolutionHistory(String groupId, @Nullable String objectTypeName);

    /**
     * Gets version corresponding to the schema. If group has been configured with {@link GroupProperties#validateByObjectType}
     * the version will contain the schemaName taken from the {@link SchemaInfo#name}. 
     * Version is uniquely identified by {@link SchemaInfo#schemaData}. 
     * 
     * @param groupId Id for the group. 
     * @param schema SchemaInfo that captures schema name and schema data. 
     * @return VersionInfo corresponding to schema. 
     */
    VersionInfo getSchemaVersion(String groupId, SchemaInfo schema);
    
    /**
     * Checks whether given schema is valid by applying validation rules against previous schemas in the group  
     * subject to current {@link GroupProperties#schemaValidationRules} policy.
     * This api performs exactly the same validations as {@link SchemaRegistryClient#addSchema(String, SchemaInfo)}
     * but without registering the schema. This is primarily to be used during schema development phase to validate that 
     * the changes to schema are in compliance with validation rules for the group.  
     * 
     * @param groupId Id for the group. 
     * @param schema Schema to check for validity. 
     * @return True if it satisfies validation checks, false otherwise. 
     */
    boolean validateSchema(String groupId, SchemaInfo schema);

    /**
     * Checks whether given schema can be used to read by validating it for reads against one or more existing schemas in the group  
     * subject to current {@link GroupProperties#schemaValidationRules} policy.
     * 
     * @param groupId Id for the group. 
     * @param schema Schema to check to be used for reads. 
     * @return True if it can be used to read, false otherwise. 
     */
    boolean canRead(String groupId, SchemaInfo schema);

    /**
     * List of codecs used for encoding in the group. 
     * 
     * @param groupId Id for the group. 
     * @return List of codecs used for encoding in the group. 
     */
    List<CodecType> getCodecs(String groupId);

    /**
     * Add new codec to be used in encoding in the group. 
     * 
     * @param groupId Id for the group. 
     * @param codecType codec type.
     */
    void addCodec(String groupId, CodecType codecType);

}
    class GroupProperties {
        private final SchemaType schemaType;
        private final SchemaValidationRules schemaValidationRules;
        private final boolean validateBySchemaName;
        private final Map<String, String> properties;
    }
	
    class EncodingInfo {
        private final SchemaInfo schemaInfo;
        private final VersionInfo versionInfo;
        private final CompressionType compressionType;
    }

    class EncodingId {
        private final int id;
    }

    class VersionInfo {
        private final int version;
        private final String schemaName;
    }
    
    class SchemaInfo {
        private final String schemaName;
        private final SchemaType schemaType;
        private final byte[] schemaData;
        private final ImmutableMap<String, String> properties;
    }

    enum SchemaType {
        None,
        Avro,
        Protobuf,
        Json,
        Any,
        Custom
    }

    enum CodecType {
        None,
        Snappy,
        GZip,
        Custom;
    }

Schema Storage

Since Pravega is a highly scalable reliable and durable storage system. Schema Registry will be backed by the pravega as the underlying storage layer.

Schemas are stored under groups. Each group is backed by a pravega table segment where all its metadata is durably and consistently persisted.

A common query on the registry would be finding if a schema already exists for the stream and finding its version. Another common query is to retrieve schema corresponding to a version.

Typically we dont envison a large number of schemas per stream and it would be in order of hundreds. However, retrieving and comparing all schemas from a revisioned stream could be costly. So we will maintain indexes and reverse indexes in pravega tables so that all such queries are optimized.

We will index schema fingerprints with their position in the revisioned segement. Fingerprint could be a 64 bit hash. A 64 bit hash will have very little probability of collisions as schemas are versioned per stream and there would be order hundreds of schemas.

Compatibility Checks

Schema Registry will perform schema compatibility checks while accepting changes to schemas. A new schema will only be added to the registry if it satisfies compatibility checks. Schema Registry will contain CompatibilityCheckers for each schema format. This will be an interface and users can plug in compatibility checkers for their serialization systems.

As Schema Registry receives the schema, it locates the Compatibility checker corresponding to the schema type and the registered compatibility level for the stream. It then applies the Compatibility checker on the new schema against existing schemas with selected strategy.

The exact rules for compatibility for different serialization systems may vary, and hence format specific checkers will be responsible for parsing and validating schemas.

To start with, we will only support Compatibility checker for Avro and Json. Going forward we will look to include compatibility checkers for popular schema formats like protobuf, thrift etc.

In first implementation, allowed values for compatibility check strategy for formats other than avro and json will either be None or Disabled.

There are two reasons for starting with Avro as the format with Compatibility checks. a) Avro is one of the most popular serialization formats. b) Avro has libraries to validate schemas for compatibility with support for multiple strategies namely "CanRead", "CanBeRead" and "MutualRead". These can be mapped to backward, forward and full strategies as listed above. In future we will keep adding compatibility checkers for other popular formats. Let us take a few examples of how compatibility for different serialization systems can be specified for a stream:

Example 1:
SchemaType: Avro ; Compatibility: None
Registry will not allow the schema in the stream to change and all new schemas will be rejected. 

Example 2:
SchemaType: Avro ; Compatibility: Disabled
Registry will not enforce any compatibility check and all changes to the schemas will be accepted. 

Example 3:
SchemaType: protobuf; Compatibility: Disabled
Registry will not enforce any compatibility check and all changes to the schemas will be accepted. 

Example 4:
SchemaType: custom; Compatibility: None
Registry will not allow the schema in the stream to change and all new schemas will be rejected. 

Example 5:
SchemaType: protobuf; Compatibility: Backward
NotSupportedException  // hopefully we can support it in future

Example 6:
SchemaType: avro; Compatibility: Backward 
Check for backward compatibility with the latest schema.

Example 7:
SchemaType: avro; Compatibility: Full Transitive
Check for compatibility in both directions with all the previously stored schemas for this stream.

Compatibility check during development phase

Preventing incompatible events to be ingested into Pravega at runtime has clear advantages as discussed above. However, it is imperative to prevent such mistakes during development phase. The objective is to allow developers to run schema validation against a stream in production while developing on next version of their application. They should ideally be able to run validation on their development environment, on their dev machines. There should be tools available to facilitate integrating schema validation in Continuous integration pipeline before every commit or as part of periodic builds.

There is merit in building tooling that integrate with registry running in production. The registry should expose apis to retrieve all schemas and also perform schema compatibility checks. We should be able to build simple tools and maven plugins that could allow developers to validate their schemas against the scemas in the registry. Schema apis are designed to expose and perform compatibility checks. However, development of such a tool is outside the scope of this PDP and shall be addressed separately.

Multiple Language Support:

With multiple language binding support coming to pravega clients, it will mean supporting PravegaSerializers wrappers in each of those languages. We use a simple encoding and leave the actual event serialization on underlying serializer for SchemaType. So as long as we have serializer either available in RUST or the higher layer language binding, we can support schema registry by building PravegaSerializer in the language of choice.

Rule engine for schema validation

Users may want to perform more semantic checks over and above compatibility checks at the time of registration of new schema. For this purpose we may provide a way for them to express these rules and run these validation rules for the schema against existing schemas for the stream. This can be a future direction which can provide benefit to users.

Discarded Alternatives

  1. Building an avro only schema registry. We did not want the schema management to be tied to a specific serialization format. We wanted to abstract the concepts of the registry and have applications using different serialization formats reap benefits of the registry. It may be the case that in first implementation we have more capabilities light up for avro as compared to other formats (for example, compatibility checks) but restricting ourselves to one format would be detrimental.

  2. Support for storing SerDe libraries and dynamically loading SerDe jars into runtime jvm. The idea stemmed from extending benefits of a centralized service to serialization systems that do not support an external schema defined with an IDL. The use case and benefits would be similar to sharing of schema - essentially allowing applications to know and interpret the structure of the data. This was rejected because the proposal to load SerDe dynamically at runtime would have serious security implications of loading arbitrary pieces of code into a runtime.

There is still merit in making these SerDe available centrally to be used for dynamic linking at deployment time.Though that can also be achieved through other mechanisms like out of band sharing of SerDe and we do not need to build support for it in schema registry.

  1. Relegating schema validation to only a development time activity with tooling to integrate schema storage with code repository. We discussed and evaluated whether schema management is primarily a development time activity or whether it could have benefits during runtime as well. The discussion ranged from considering development time tooling for schema management to facilitate greater decoupling and reduced coordination across teams working on downstream and upstream applications. We realized that there is definitely merit in providing development time tools, but that activity is completely decoupled from the purposes a schema management layer serves at runtime. And the registry service with schema evolution support can also be leveraged during development phase if we build tooling that allowed developers to automatically have their schema changes validated against the schema in the registry.

  2. Using third party schema registries for schema management. We explored many available third party schema registries built for streaming data. They were either tightly coupled with a specific serialization format or provided a view of the schemas in context of specific streaming system. Another problem with these registries was their schema storage backends which would require management of an additional durable storage system when pravega itself is a highly scalable durable and consistent storage backend.

Future improvements

  1. Support for additional Data Storage/Serialization Format schemas.
  2. Pluggable Compatibility Checkers for custom schema formats.
  3. Integration with security systems and governance systems like Atlas.
  4. Multiple language support for client sdk.
  5. Storage for SerDe jars
Clone this wiki locally