Skip to content

Commit

Permalink
[Schema Registry Avro] New Encoder Design (Azure#19842)
Browse files Browse the repository at this point in the history
Fixes Azure#20061

## Overview
Revamps the schema registry encoder to work on messages instead of buffers based on the recommendation of the Azure messaging architect.

This changes the APIs as follows:
```ts
  const buffer: NodeJS.Buffer = await serializer.serialize(value, schema);
```
becomes
```ts
  const message: MessageWithMetadata = await encoder.encodeMessageData(value, schema);
```
where `MessageWithMetadata` has a `body` field as well as a `contentType` field. The latter's format is `avro/binary+<Schema ID>`

For derserializing, the change is as follows:
```ts
  const deserializedValue = await serializer.deserialize(buffer);
```
becomes:
```ts
  const decodedObject = await encoder.decodeMessageData(message);
```

## Improvement upon  Azure#15959

This design introduces a new `messageAdapter` option in the encoder constructor to support processing of any message type (e.g. [cloud event](https://github.com/cloudevents/spec/blob/v1.0.1/spec.md)):

```ts
  const encoder = new SchemaRegistryAvroEncoder(schemaRegistryClient, {
    groupName,
    messageAdapter: adapter
  });
```

where `adapter` is a message adapter that follows the following contract:

```ts
interface MessageAdapter<MessageT> {
  produceMessage: (messageWithMetadata: MessageWithMetadata) => MessageT;
  consumeMessage: (message: MessageT) => MessageWithMetadata;
}

 interface MessageWithMetadata {
  body: Uint8Array;
  contentType: string;
}
```

For convenience, the PR adds a couple of convenience adapter factories for Event Hubs's `EventData` and Event Grid's `SendCloudEventInput<Uint8Array>`. For example, the `createCloudEventAdapter` factory can be called to construct an adapter for the latter as follows:

```ts
const adapter = createCloudEventAdapter({
      type: "azure.sdk.eventgrid.samples.cloudevent",
      source: "/azure/sdk/schemaregistry/samples/withEventGrid",
    }),
```

Note that these adapter factories are exported by their respective messaging package without explicitly implementing the contract and the PR adds new encoder tests that check whether the produced adapters follow the contract. This organization could change in the future if we create a new core place for the contract to be imported from.

See the newly added samples for how to send such messages with Event Hubs and Event Grid.

Schema Registry commitment tracking: Azure#15959
Tracking issue: Azure#18608
First iteration design: Azure#18365
  • Loading branch information
deyaaeldeen authored Jan 26, 2022
1 parent a3f10ce commit be4578a
Show file tree
Hide file tree
Showing 26 changed files with 1,219 additions and 477 deletions.
2 changes: 1 addition & 1 deletion sdk/core/core-amqp/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
"clean": "rimraf dist dist-* temp types coverage coverage-browser .nyc_output *.tgz *.log test*.xml",
"execute:samples": "echo skipped",
"extract-api": "tsc -p . && api-extractor run --local",
"format": "prettier --write --config ../../../.prettierrc.json --ignore-path ../../../.prettierignore \"src/**/*.ts\" \"test/**/*.ts\" \"samples-dev/**/*.ts\" \"*.{js,json}\"",
"format": "prettier --write --config ../../../.prettierrc.json --ignore-path ../../../.prettierignore \"src/**/*.ts\" \"test/**/*.ts\" \"*.{js,json}\"",
"integration-test:browser": "echo skipped",
"integration-test:node": "echo skipped",
"integration-test": "npm run integration-test:node && npm run integration-test:browser",
Expand Down
2 changes: 1 addition & 1 deletion sdk/core/core-amqp/src/util/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ export interface WebSocketOptions {
* A constant that indicates whether the environment is node.js or browser based.
*/
export const isNode =
!!process && !!process.version && !!process.versions && !!process.versions.node;
typeof process !== "undefined" && Boolean(process.version) && Boolean(process.versions?.node);

/**
* Defines an object with possible properties defined in T.
Expand Down
24 changes: 24 additions & 0 deletions sdk/eventhub/event-hubs/review/event-hubs.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ export interface CreateBatchOptions extends OperationOptions {
partitionKey?: string;
}

// @public
export function createEventDataAdapter(params?: EventDataAdapterParameters): MessageAdapter<EventData>;

// @public
export const earliestEventPosition: EventPosition;

Expand All @@ -79,6 +82,15 @@ export interface EventData {
};
}

// @public
export interface EventDataAdapterParameters {
correlationId?: string | number | Buffer;
messageId?: string | number | Buffer;
properties?: {
[key: string]: any;
};
}

// @public
export interface EventDataBatch {
readonly count: number;
Expand Down Expand Up @@ -225,6 +237,18 @@ export interface LoadBalancingOptions {
// @public
export const logger: AzureLogger;

// @public
export interface MessageAdapter<MessageT> {
consumeMessage: (message: MessageT) => MessageWithMetadata;
produceMessage: (messageWithMetadata: MessageWithMetadata) => MessageT;
}

// @public
export interface MessageWithMetadata {
body: Uint8Array;
contentType: string;
}

export { MessagingError }

// @public
Expand Down
103 changes: 103 additions & 0 deletions sdk/eventhub/event-hubs/src/eventDataAdapter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

import { EventData } from "./eventData";

/**
* A message with payload and content type fields
*
* This interface is hidden because it is already exported by `@azure/schema-registry-avro`
*
* @hidden
*/
export interface MessageWithMetadata {
/**
* The message's binary data
*/
body: Uint8Array;
/**
* The message's content type
*/
contentType: string;
}

/**
* A message adapter interface that specifies methods for producing and consuming
* messages with payloads and content type fields.
*
* This interface is hidden because it is already exported by `@azure/schema-registry-avro`
*
* @hidden
*/
export interface MessageAdapter<MessageT> {
/**
* defines how to create a message from a payload and a content type
*/
produceMessage: (messageWithMetadata: MessageWithMetadata) => MessageT;
/**
* defines how to access the payload and the content type of a message
*/
consumeMessage: (message: MessageT) => MessageWithMetadata;
}

// This type should always be equivalent to Omit<Omit<EventData, "body">, "contentType">
/**
* Parameters to the `createEventDataAdapter` function that creates an event data adapter.
*/
export interface EventDataAdapterParameters {
/**
* The correlation identifier that allows an
* application to specify a context for the message for the purposes of correlation, for example
* reflecting the MessageId of a message that is being replied to.
*/
correlationId?: string | number | Buffer;

/**
* The message identifier is an
* application-defined value that uniquely identifies the message and its payload.
*
* Note: Numbers that are not whole integers are not allowed.
*/
messageId?: string | number | Buffer;

/**
* Set of key value pairs that can be used to set properties specific to user application.
*/
properties?: {
[key: string]: any;
};
}

/**
* A function that constructs an event data adapter. That adapter can be used
* with `@azure/schema-registry-avro` to encode and decode body in event data.
*
* @param params - parameters to create the event data
* @returns An event data adapter that can produce and consume event data
*/
export function createEventDataAdapter(
params: EventDataAdapterParameters = {}
): MessageAdapter<EventData> {
return {
produceMessage: ({ body, contentType }: MessageWithMetadata) => {
return {
...params,
body,
contentType,
};
},
consumeMessage: (message: EventData): MessageWithMetadata => {
const { body, contentType } = message;
if (body === undefined || !(body instanceof Uint8Array)) {
throw new Error("Expected the body field to be defined and have a Uint8Array");
}
if (contentType === undefined) {
throw new Error("Expected the contentType field to be defined");
}
return {
body,
contentType,
};
},
};
}
2 changes: 2 additions & 0 deletions sdk/eventhub/event-hubs/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,5 @@ export {
parseEventHubConnectionString,
EventHubConnectionStringProperties,
} from "./util/connectionStringUtils";

export * from "./eventDataAdapter";
6 changes: 6 additions & 0 deletions sdk/schemaregistry/schema-registry-avro/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@

### Features Added

- The serializer APIs have been revamped to work on messages instead of buffers where the payload is the pure encoded-data. The schema ID became part of the content type of that message. This change will improve the experience of using this encoder with the other messaging clients (e.g. Event Hubs, Service Bus, and Event Grid clients).
- `decodeMessageData` now supports decoding using a different but compatible schema

### Breaking Changes
- The `SchemaRegistryAvroSerializer` class has been renamed to `SchemaRegistryAvroEncoder`
- The `serialize` method has been renamed to `encodeMessageData` and it now returns a message
- The `deserialize` method has been renamed to `decodeMessageData` and it now takes a message as input

### Bugs Fixed

Expand Down
72 changes: 38 additions & 34 deletions sdk/schemaregistry/schema-registry-avro/README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# Azure Schema Registry Avro serializer client library for JavaScript
# Azure Schema Registry Avro Encoder client library for JavaScript

Azure Schema Registry is a schema repository service hosted by Azure Event Hubs,
providing schema storage, versioning, and management. This package provides an
Avro serializer capable of serializing and deserializing payloads containing
Schema Registry schema identifiers and Avro-encoded data.
Avro encoder capable of encoding and decoding payloads containing
Avro-encoded data.

Key links:

Expand Down Expand Up @@ -31,64 +31,68 @@ npm install @azure/schema-registry-avro

## Key concepts

### SchemaRegistryAvroSerializer
### SchemaRegistryAvroEncoder

Provides API to serialize to and deserialize from Avro Binary Encoding plus a
header with schema ID. Uses
Provides API to encode to and decode from Avro Binary Encoding wrapped in a message
with a content type field containing the schema ID. Uses
`SchemaRegistryClient` from the [@azure/schema-registry](https://www.npmjs.com/package/@azure/schema-registry) package
to get schema IDs from schema definition or vice versa. The provided API has internal cache to avoid calling the schema registry service when possible.

### Message format
### Messages

The same format is used by schema registry serializers across Azure SDK languages.
By default, the encoder will create messages structured as follows:

Messages are encoded as follows:
- `body`: a byte array containing data in the Avro Binary Encoding. Note that it
is NOT Avro Object Container File. The latter includes the schema and creating
it defeats the purpose of using this encoder to move the schema out of the
message payload and into the schema registry.

- 4 bytes: Format Indicator
- `contentType`: a string of the following format `avro/binary+<Schema ID>` where
the `avro/binary` part signals that this message has an Avro-encoded payload
and the `<Schema Id>` part is the Schema ID the Schema Registry service assigned
to the schema used to encode this payload.

- Currently always zero to indicate format below.

- 32 bytes: Schema ID

- UTF-8 hexadecimal representation of GUID.
- 32 hex digits, no hyphens.
- Same format and byte order as string from Schema Registry service.

- Remaining bytes: Avro payload (in general, format-specific payload)

- Avro Binary Encoding
- NOT Avro Object Container File, which includes the schema and defeats the
purpose of this serialzer to move the schema out of the message payload and
into the schema registry.
Not all messaging services are supporting the same message structure. To enable
integration with such services, the encoder can act on custom message structures
by setting the `messageAdapter` option in the constructor with a corresponding
message producer and consumer. Azure messaging client libraries export default
adapters for their message types.

## Examples

### Serialize and deserialize
### Encode and decode an `@azure/event-hubs`'s `EventData`

```javascript
const { DefaultAzureCredential } = require("@azure/identity");
import { createEventDataAdapter } from "@azure/event-hubs";
const { SchemaRegistryClient } = require("@azure/schema-registry");
const { SchemaRegistryAvroSerializer } = require("@azure/schema-registry-avro");

const client = new SchemaRegistryClient("<fully qualified namespace>", new DefaultAzureCredential());
const serializer = new SchemaRegistryAvroSerializer(client, { groupName: "<group>" });
const { SchemaRegistryAvroEncoder } = require("@azure/schema-registry-avro");

const client = new SchemaRegistryClient(
"<fully qualified namespace>",
new DefaultAzureCredential()
);
const encoder = new SchemaRegistryAvroEncoder(client, {
groupName: "<group>",
messageAdapter: createEventDataAdapter(),
});

// Example Avro schema
const schema = JSON.stringify({
type: "record",
name: "Rating",
namespace: "my.example",
fields: [{ name: "score", type: "int" }]
fields: [{ name: "score", type: "int" }],
});

// Example value that matches the Avro schema above
const value = { score: 42 };

// Serialize value to buffer
const buffer = await serializer.serialize(value, schema);
// Encode value to a message
const message = await encoder.encodeMessageData(value, schema);

// Deserialize buffer to value
const deserializedValue = await serializer.deserialize(buffer);
// Decode a message to value
const decodedValue = await encoder.decodeMessageData(message);
```

## Troubleshooting
Expand Down
2 changes: 2 additions & 0 deletions sdk/schemaregistry/schema-registry-avro/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@
"devDependencies": {
"@azure/dev-tool": "^1.0.0",
"@azure/eslint-plugin-azure-sdk": "^3.0.0",
"@azure/event-hubs": "^5.7.0-beta.2",
"@azure/identity": "^2.0.1",
"@azure/test-utils": "^1.0.0",
"@azure-tools/test-recorder": "^1.0.0",
"@microsoft/api-extractor": "^7.18.11",
"@rollup/plugin-commonjs": "11.0.2",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,37 @@
```ts

/// <reference types="node" />

import { SchemaRegistry } from '@azure/schema-registry';

// @public
export class SchemaRegistryAvroSerializer {
constructor(client: SchemaRegistry, options?: SchemaRegistryAvroSerializerOptions);
deserialize(input: Buffer | Blob | Uint8Array): Promise<unknown>;
serialize(value: unknown, schema: string): Promise<Uint8Array>;
export interface DecodeMessageDataOptions {
schema?: string;
}

// @public
export interface MessageAdapter<MessageT> {
consumeMessage: (message: MessageT) => MessageWithMetadata;
produceMessage: (messageWithMetadata: MessageWithMetadata) => MessageT;
}

// @public
export interface MessageWithMetadata {
body: Uint8Array;
contentType: string;
}

// @public
export class SchemaRegistryAvroEncoder<MessageT = MessageWithMetadata> {
constructor(client: SchemaRegistry, options?: SchemaRegistryAvroEncoderOptions<MessageT>);
decodeMessageData(message: MessageT, options?: DecodeMessageDataOptions): Promise<unknown>;
encodeMessageData(value: unknown, schema: string): Promise<MessageT>;
}

// @public
export interface SchemaRegistryAvroSerializerOptions {
export interface SchemaRegistryAvroEncoderOptions<MessageT> {
autoRegisterSchemas?: boolean;
groupName?: string;
messageAdapter?: MessageAdapter<MessageT>;
}

// (No @packageDocumentation comment for this package)
Expand Down
9 changes: 9 additions & 0 deletions sdk/schemaregistry/schema-registry-avro/sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,12 @@ SCHEMA_REGISTRY_GROUP=<Group name for schemas in registry>
AZURE_TENANT_ID=<AD tenant id or name>
AZURE_CLIENT_ID=<ID of the user/service principal to authenticate as>
AZURE_CLIENT_SECRET=<client secret used to authenticate to Azure AD>

# Used in samples that use Event Hubs. Retrieve these values from an Event Hub in the Azure portal.
EVENTHUB_CONNECTION_STRING=<Event Hub connection string>
EVENTHUB_NAME=<Event Hub name>
CONSUMER_GROUP_NAME=<Event Hub Group name>

# Used in samples that use Event Grid. Retrieve these values from an Event Grid topic in the Azure portal
EVENT_GRID_TOPIC_ENDPOINT=<Event Grid topic endpoint>
EVENT_GRID_TOPIC_API_KEY=<Event Grid topic API key>
Loading

0 comments on commit be4578a

Please sign in to comment.