diff --git a/sdk/core/core-amqp/package.json b/sdk/core/core-amqp/package.json index 90a16b0f7016..6e434fdfe433 100644 --- a/sdk/core/core-amqp/package.json +++ b/sdk/core/core-amqp/package.json @@ -43,7 +43,7 @@ "clean": "rimraf dist dist-* temp types coverage coverage-browser .nyc_output *.tgz *.log test*.xml", "execute:samples": "echo skipped", "extract-api": "tsc -p . && api-extractor run --local", - "format": "prettier --write --config ../../../.prettierrc.json --ignore-path ../../../.prettierignore \"src/**/*.ts\" \"test/**/*.ts\" \"samples-dev/**/*.ts\" \"*.{js,json}\"", + "format": "prettier --write --config ../../../.prettierrc.json --ignore-path ../../../.prettierignore \"src/**/*.ts\" \"test/**/*.ts\" \"*.{js,json}\"", "integration-test:browser": "echo skipped", "integration-test:node": "echo skipped", "integration-test": "npm run integration-test:node && npm run integration-test:browser", diff --git a/sdk/core/core-amqp/src/util/utils.ts b/sdk/core/core-amqp/src/util/utils.ts index ac73f74e3eb1..68333b02b7b8 100644 --- a/sdk/core/core-amqp/src/util/utils.ts +++ b/sdk/core/core-amqp/src/util/utils.ts @@ -58,7 +58,7 @@ export interface WebSocketOptions { * A constant that indicates whether the environment is node.js or browser based. */ export const isNode = - !!process && !!process.version && !!process.versions && !!process.versions.node; + typeof process !== "undefined" && Boolean(process.version) && Boolean(process.versions?.node); /** * Defines an object with possible properties defined in T. diff --git a/sdk/eventhub/event-hubs/review/event-hubs.api.md b/sdk/eventhub/event-hubs/review/event-hubs.api.md index 6d028766d38c..03c3baec2ee0 100644 --- a/sdk/eventhub/event-hubs/review/event-hubs.api.md +++ b/sdk/eventhub/event-hubs/review/event-hubs.api.md @@ -61,6 +61,9 @@ export interface CreateBatchOptions extends OperationOptions { partitionKey?: string; } +// @public +export function createEventDataAdapter(params?: EventDataAdapterParameters): MessageAdapter; + // @public export const earliestEventPosition: EventPosition; @@ -79,6 +82,15 @@ export interface EventData { }; } +// @public +export interface EventDataAdapterParameters { + correlationId?: string | number | Buffer; + messageId?: string | number | Buffer; + properties?: { + [key: string]: any; + }; +} + // @public export interface EventDataBatch { readonly count: number; @@ -225,6 +237,18 @@ export interface LoadBalancingOptions { // @public export const logger: AzureLogger; +// @public +export interface MessageAdapter { + consumeMessage: (message: MessageT) => MessageWithMetadata; + produceMessage: (messageWithMetadata: MessageWithMetadata) => MessageT; +} + +// @public +export interface MessageWithMetadata { + body: Uint8Array; + contentType: string; +} + export { MessagingError } // @public diff --git a/sdk/eventhub/event-hubs/src/eventDataAdapter.ts b/sdk/eventhub/event-hubs/src/eventDataAdapter.ts new file mode 100644 index 000000000000..1813dcebac9b --- /dev/null +++ b/sdk/eventhub/event-hubs/src/eventDataAdapter.ts @@ -0,0 +1,103 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { EventData } from "./eventData"; + +/** + * A message with payload and content type fields + * + * This interface is hidden because it is already exported by `@azure/schema-registry-avro` + * + * @hidden + */ +export interface MessageWithMetadata { + /** + * The message's binary data + */ + body: Uint8Array; + /** + * The message's content type + */ + contentType: string; +} + +/** + * A message adapter interface that specifies methods for producing and consuming + * messages with payloads and content type fields. + * + * This interface is hidden because it is already exported by `@azure/schema-registry-avro` + * + * @hidden + */ +export interface MessageAdapter { + /** + * defines how to create a message from a payload and a content type + */ + produceMessage: (messageWithMetadata: MessageWithMetadata) => MessageT; + /** + * defines how to access the payload and the content type of a message + */ + consumeMessage: (message: MessageT) => MessageWithMetadata; +} + +// This type should always be equivalent to Omit, "contentType"> +/** + * Parameters to the `createEventDataAdapter` function that creates an event data adapter. + */ +export interface EventDataAdapterParameters { + /** + * The correlation identifier that allows an + * application to specify a context for the message for the purposes of correlation, for example + * reflecting the MessageId of a message that is being replied to. + */ + correlationId?: string | number | Buffer; + + /** + * The message identifier is an + * application-defined value that uniquely identifies the message and its payload. + * + * Note: Numbers that are not whole integers are not allowed. + */ + messageId?: string | number | Buffer; + + /** + * Set of key value pairs that can be used to set properties specific to user application. + */ + properties?: { + [key: string]: any; + }; +} + +/** + * A function that constructs an event data adapter. That adapter can be used + * with `@azure/schema-registry-avro` to encode and decode body in event data. + * + * @param params - parameters to create the event data + * @returns An event data adapter that can produce and consume event data + */ +export function createEventDataAdapter( + params: EventDataAdapterParameters = {} +): MessageAdapter { + return { + produceMessage: ({ body, contentType }: MessageWithMetadata) => { + return { + ...params, + body, + contentType, + }; + }, + consumeMessage: (message: EventData): MessageWithMetadata => { + const { body, contentType } = message; + if (body === undefined || !(body instanceof Uint8Array)) { + throw new Error("Expected the body field to be defined and have a Uint8Array"); + } + if (contentType === undefined) { + throw new Error("Expected the contentType field to be defined"); + } + return { + body, + contentType, + }; + }, + }; +} diff --git a/sdk/eventhub/event-hubs/src/index.ts b/sdk/eventhub/event-hubs/src/index.ts index 92ca9e5fb5d3..ef65143cbf71 100644 --- a/sdk/eventhub/event-hubs/src/index.ts +++ b/sdk/eventhub/event-hubs/src/index.ts @@ -51,3 +51,5 @@ export { parseEventHubConnectionString, EventHubConnectionStringProperties, } from "./util/connectionStringUtils"; + +export * from "./eventDataAdapter"; diff --git a/sdk/schemaregistry/schema-registry-avro/CHANGELOG.md b/sdk/schemaregistry/schema-registry-avro/CHANGELOG.md index f4f295de993e..c78610a6c888 100644 --- a/sdk/schemaregistry/schema-registry-avro/CHANGELOG.md +++ b/sdk/schemaregistry/schema-registry-avro/CHANGELOG.md @@ -4,7 +4,13 @@ ### Features Added +- The serializer APIs have been revamped to work on messages instead of buffers where the payload is the pure encoded-data. The schema ID became part of the content type of that message. This change will improve the experience of using this encoder with the other messaging clients (e.g. Event Hubs, Service Bus, and Event Grid clients). +- `decodeMessageData` now supports decoding using a different but compatible schema + ### Breaking Changes +- The `SchemaRegistryAvroSerializer` class has been renamed to `SchemaRegistryAvroEncoder` +- The `serialize` method has been renamed to `encodeMessageData` and it now returns a message +- The `deserialize` method has been renamed to `decodeMessageData` and it now takes a message as input ### Bugs Fixed diff --git a/sdk/schemaregistry/schema-registry-avro/README.md b/sdk/schemaregistry/schema-registry-avro/README.md index 0cfbf49fd32b..ce80a3b04e8c 100644 --- a/sdk/schemaregistry/schema-registry-avro/README.md +++ b/sdk/schemaregistry/schema-registry-avro/README.md @@ -1,9 +1,9 @@ -# Azure Schema Registry Avro serializer client library for JavaScript +# Azure Schema Registry Avro Encoder client library for JavaScript Azure Schema Registry is a schema repository service hosted by Azure Event Hubs, providing schema storage, versioning, and management. This package provides an -Avro serializer capable of serializing and deserializing payloads containing -Schema Registry schema identifiers and Avro-encoded data. +Avro encoder capable of encoding and decoding payloads containing +Avro-encoded data. Key links: @@ -31,64 +31,68 @@ npm install @azure/schema-registry-avro ## Key concepts -### SchemaRegistryAvroSerializer +### SchemaRegistryAvroEncoder -Provides API to serialize to and deserialize from Avro Binary Encoding plus a -header with schema ID. Uses +Provides API to encode to and decode from Avro Binary Encoding wrapped in a message +with a content type field containing the schema ID. Uses `SchemaRegistryClient` from the [@azure/schema-registry](https://www.npmjs.com/package/@azure/schema-registry) package to get schema IDs from schema definition or vice versa. The provided API has internal cache to avoid calling the schema registry service when possible. -### Message format +### Messages -The same format is used by schema registry serializers across Azure SDK languages. +By default, the encoder will create messages structured as follows: -Messages are encoded as follows: +- `body`: a byte array containing data in the Avro Binary Encoding. Note that it + is NOT Avro Object Container File. The latter includes the schema and creating + it defeats the purpose of using this encoder to move the schema out of the + message payload and into the schema registry. -- 4 bytes: Format Indicator +- `contentType`: a string of the following format `avro/binary+` where + the `avro/binary` part signals that this message has an Avro-encoded payload + and the `` part is the Schema ID the Schema Registry service assigned + to the schema used to encode this payload. - - Currently always zero to indicate format below. - -- 32 bytes: Schema ID - - - UTF-8 hexadecimal representation of GUID. - - 32 hex digits, no hyphens. - - Same format and byte order as string from Schema Registry service. - -- Remaining bytes: Avro payload (in general, format-specific payload) - - - Avro Binary Encoding - - NOT Avro Object Container File, which includes the schema and defeats the - purpose of this serialzer to move the schema out of the message payload and - into the schema registry. +Not all messaging services are supporting the same message structure. To enable +integration with such services, the encoder can act on custom message structures +by setting the `messageAdapter` option in the constructor with a corresponding +message producer and consumer. Azure messaging client libraries export default +adapters for their message types. ## Examples -### Serialize and deserialize +### Encode and decode an `@azure/event-hubs`'s `EventData` ```javascript const { DefaultAzureCredential } = require("@azure/identity"); +import { createEventDataAdapter } from "@azure/event-hubs"; const { SchemaRegistryClient } = require("@azure/schema-registry"); -const { SchemaRegistryAvroSerializer } = require("@azure/schema-registry-avro"); - -const client = new SchemaRegistryClient("", new DefaultAzureCredential()); -const serializer = new SchemaRegistryAvroSerializer(client, { groupName: "" }); +const { SchemaRegistryAvroEncoder } = require("@azure/schema-registry-avro"); + +const client = new SchemaRegistryClient( + "", + new DefaultAzureCredential() +); +const encoder = new SchemaRegistryAvroEncoder(client, { + groupName: "", + messageAdapter: createEventDataAdapter(), +}); // Example Avro schema const schema = JSON.stringify({ type: "record", name: "Rating", namespace: "my.example", - fields: [{ name: "score", type: "int" }] + fields: [{ name: "score", type: "int" }], }); // Example value that matches the Avro schema above const value = { score: 42 }; -// Serialize value to buffer -const buffer = await serializer.serialize(value, schema); +// Encode value to a message +const message = await encoder.encodeMessageData(value, schema); -// Deserialize buffer to value -const deserializedValue = await serializer.deserialize(buffer); +// Decode a message to value +const decodedValue = await encoder.decodeMessageData(message); ``` ## Troubleshooting diff --git a/sdk/schemaregistry/schema-registry-avro/package.json b/sdk/schemaregistry/schema-registry-avro/package.json index 7bc04056d6d6..e481ba9a7bbc 100644 --- a/sdk/schemaregistry/schema-registry-avro/package.json +++ b/sdk/schemaregistry/schema-registry-avro/package.json @@ -79,7 +79,9 @@ "devDependencies": { "@azure/dev-tool": "^1.0.0", "@azure/eslint-plugin-azure-sdk": "^3.0.0", + "@azure/event-hubs": "^5.7.0-beta.2", "@azure/identity": "^2.0.1", + "@azure/test-utils": "^1.0.0", "@azure-tools/test-recorder": "^1.0.0", "@microsoft/api-extractor": "^7.18.11", "@rollup/plugin-commonjs": "11.0.2", diff --git a/sdk/schemaregistry/schema-registry-avro/review/schema-registry-avro.api.md b/sdk/schemaregistry/schema-registry-avro/review/schema-registry-avro.api.md index 5dd99b436358..7222e6780ddd 100644 --- a/sdk/schemaregistry/schema-registry-avro/review/schema-registry-avro.api.md +++ b/sdk/schemaregistry/schema-registry-avro/review/schema-registry-avro.api.md @@ -4,21 +4,37 @@ ```ts -/// - import { SchemaRegistry } from '@azure/schema-registry'; // @public -export class SchemaRegistryAvroSerializer { - constructor(client: SchemaRegistry, options?: SchemaRegistryAvroSerializerOptions); - deserialize(input: Buffer | Blob | Uint8Array): Promise; - serialize(value: unknown, schema: string): Promise; +export interface DecodeMessageDataOptions { + schema?: string; +} + +// @public +export interface MessageAdapter { + consumeMessage: (message: MessageT) => MessageWithMetadata; + produceMessage: (messageWithMetadata: MessageWithMetadata) => MessageT; +} + +// @public +export interface MessageWithMetadata { + body: Uint8Array; + contentType: string; +} + +// @public +export class SchemaRegistryAvroEncoder { + constructor(client: SchemaRegistry, options?: SchemaRegistryAvroEncoderOptions); + decodeMessageData(message: MessageT, options?: DecodeMessageDataOptions): Promise; + encodeMessageData(value: unknown, schema: string): Promise; } // @public -export interface SchemaRegistryAvroSerializerOptions { +export interface SchemaRegistryAvroEncoderOptions { autoRegisterSchemas?: boolean; groupName?: string; + messageAdapter?: MessageAdapter; } // (No @packageDocumentation comment for this package) diff --git a/sdk/schemaregistry/schema-registry-avro/sample.env b/sdk/schemaregistry/schema-registry-avro/sample.env index 18e56c11ef2f..15f5a8fc6249 100644 --- a/sdk/schemaregistry/schema-registry-avro/sample.env +++ b/sdk/schemaregistry/schema-registry-avro/sample.env @@ -9,3 +9,12 @@ SCHEMA_REGISTRY_GROUP= AZURE_TENANT_ID= AZURE_CLIENT_ID= AZURE_CLIENT_SECRET= + +# Used in samples that use Event Hubs. Retrieve these values from an Event Hub in the Azure portal. +EVENTHUB_CONNECTION_STRING= +EVENTHUB_NAME= +CONSUMER_GROUP_NAME= + +# Used in samples that use Event Grid. Retrieve these values from an Event Grid topic in the Azure portal +EVENT_GRID_TOPIC_ENDPOINT= +EVENT_GRID_TOPIC_API_KEY= \ No newline at end of file diff --git a/sdk/schemaregistry/schema-registry-avro/samples-dev/schemaRegistryAvroSample.ts b/sdk/schemaregistry/schema-registry-avro/samples-dev/schemaRegistryAvroSample.ts index 8bd4b39f4f80..e54248ddcf39 100644 --- a/sdk/schemaregistry/schema-registry-avro/samples-dev/schemaRegistryAvroSample.ts +++ b/sdk/schemaregistry/schema-registry-avro/samples-dev/schemaRegistryAvroSample.ts @@ -2,19 +2,22 @@ // Licensed under the MIT License. /** - * @summary Demonstrates the use of SchemaRegistryAvroSerializer to serialize and deserialize using schema from Schema Registry. + * @summary Demonstrates the use of SchemaRegistryAvroEncoder to create messages with avro-encoded payload using schema from Schema Registry. */ import { DefaultAzureCredential } from "@azure/identity"; import { SchemaRegistryClient, SchemaDescription } from "@azure/schema-registry"; -import { SchemaRegistryAvroSerializer } from "@azure/schema-registry-avro"; +import { SchemaRegistryAvroEncoder } from "@azure/schema-registry-avro"; // Load the .env file if it exists import * as dotenv from "dotenv"; dotenv.config(); -// Set these environment variables or edit the following values -const endpoint = process.env["SCHEMA_REGISTRY_ENDPOINT"] || ""; +// The fully qualified namespace for schema registry +const schemaRegistryFullyQualifiedNamespace = + process.env["SCHEMA_REGISTRY_ENDPOINT"] || ""; + +// The schema group to use for schema registeration or lookup const groupName = process.env["SCHEMA_REGISTRY_GROUP"] || "AzureSdkSampleGroup"; // Sample Avro Schema for user with first and last names @@ -52,26 +55,29 @@ const schemaDescription: SchemaDescription = { export async function main() { // Create a new client - const client = new SchemaRegistryClient(endpoint, new DefaultAzureCredential()); + const client = new SchemaRegistryClient( + schemaRegistryFullyQualifiedNamespace, + new DefaultAzureCredential() + ); // Register the schema. This would generally have been done somewhere else. - // You can also skip this step and let serialize automatically register schemas - // using autoRegisterSchemas=true, but that is NOT recommended in production. + // You can also skip this step and let `encodeMessageData` automatically register + // schemas using autoRegisterSchemas=true, but that is NOT recommended in production. await client.registerSchema(schemaDescription); - // Create a new serializer backed by the client - const serializer = new SchemaRegistryAvroSerializer(client, { groupName }); + // Create a new encoder backed by the client + const encoder = new SchemaRegistryAvroEncoder(client, { groupName }); - // serialize an object that matches the schema + // encode an object that matches the schema and put it in a message const value: User = { firstName: "Jane", lastName: "Doe" }; - const buffer = await serializer.serialize(value, schema); - console.log("Serialized:"); - console.log(buffer); + const message = await encoder.encodeMessageData(value, schema); + console.log("Created message:"); + console.log(JSON.stringify(message)); - // deserialize the result back to an object - const deserializedValue = (await serializer.deserialize(buffer)) as User; - console.log("Deserialized:"); - console.log(`${deserializedValue.firstName} ${deserializedValue.lastName}`); + // decode the message back to an object + const decodedObject = await encoder.decodeMessageData(message); + console.log("Decoded object:"); + console.log(JSON.stringify(decodedObject as User)); } main().catch((err) => { diff --git a/sdk/schemaregistry/schema-registry-avro/samples-dev/withEventHubsBufferedProducerClient.ts b/sdk/schemaregistry/schema-registry-avro/samples-dev/withEventHubsBufferedProducerClient.ts new file mode 100644 index 000000000000..923e7cd4d932 --- /dev/null +++ b/sdk/schemaregistry/schema-registry-avro/samples-dev/withEventHubsBufferedProducerClient.ts @@ -0,0 +1,107 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +/** + * @summary Demonstrates the use of SchemaRegistryAvroEncoder to create messages with avro-encoded payload using schema from Schema Registry and send them to an Event Hub using the EventHub Buffered Producer Client. + */ + +import { DefaultAzureCredential } from "@azure/identity"; +import { SchemaRegistryClient, SchemaDescription } from "@azure/schema-registry"; +import { SchemaRegistryAvroEncoder } from "@azure/schema-registry-avro"; +import { EventHubBufferedProducerClient, createEventDataAdapter } from "@azure/event-hubs"; + +// Load the .env file if it exists +import * as dotenv from "dotenv"; +dotenv.config(); + +// The fully qualified namespace for schema registry +const schemaRegistryFullyQualifiedNamespace = + process.env["SCHEMA_REGISTRY_ENDPOINT"] || ""; + +// The schema group to use for schema registeration or lookup +const groupName = process.env["SCHEMA_REGISTRY_GROUP"] || "AzureSdkSampleGroup"; + +// The connection string for Event Hubs +const eventHubsConnectionString = process.env["EVENTHUB_CONNECTION_STRING"] || ""; + +// Sample Avro Schema for user with first and last names +const schemaObject = { + type: "record", + name: "User", + namespace: "com.azure.schemaregistry.samples", + fields: [ + { + name: "firstName", + type: "string", + }, + { + name: "lastName", + type: "string", + }, + ], +}; + +// Matching TypeScript interface for schema +interface User { + firstName: string; + lastName: string; +} + +const schema = JSON.stringify(schemaObject); + +// Description of the schema for registration +const schemaDescription: SchemaDescription = { + name: `${schemaObject.namespace}.${schemaObject.name}`, + groupName, + format: "Avro", + definition: schema, +}; + +async function handleError(): Promise { + console.log("An error occured when sending a message"); +} + +export async function main() { + // Create a new client + const schemaRegistryClient = new SchemaRegistryClient( + schemaRegistryFullyQualifiedNamespace, + new DefaultAzureCredential() + ); + + // Register the schema. This would generally have been done somewhere else. + // You can also skip this step and let `encodeMessageData` automatically register + // schemas using autoRegisterSchemas=true, but that is NOT recommended in production. + await schemaRegistryClient.registerSchema(schemaDescription); + + // Create a new encoder backed by the client + const encoder = new SchemaRegistryAvroEncoder(schemaRegistryClient, { + groupName, + messageAdapter: createEventDataAdapter(), + }); + + const eventHubsBufferedProducerClient = new EventHubBufferedProducerClient( + eventHubsConnectionString, + { + onSendEventsErrorHandler: handleError, + } + ); + + // encode an object that matches the schema + const value: User = { firstName: "Jane", lastName: "Doe" }; + const message = await encoder.encodeMessageData(value, schema); + console.log("Created message:"); + console.log(message); + + await eventHubsBufferedProducerClient.enqueueEvent(message); + console.log(`Message was added to the queue and is about to be sent`); + + // Wait for a bit before cleaning up the sample + setTimeout(async () => { + await eventHubsBufferedProducerClient.close({ flush: true }); + console.log(`Exiting sample`); + }, 30 * 1000); +} + +main().catch((err) => { + console.error("The sample encountered an error:", err); +}); diff --git a/sdk/schemaregistry/schema-registry-avro/samples-dev/withEventHubsConsumerClient.ts b/sdk/schemaregistry/schema-registry-avro/samples-dev/withEventHubsConsumerClient.ts new file mode 100644 index 000000000000..be7f83f5c682 --- /dev/null +++ b/sdk/schemaregistry/schema-registry-avro/samples-dev/withEventHubsConsumerClient.ts @@ -0,0 +1,130 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +/** + * @summary Demonstrates the use of SchemaRegistryAvroEncoder to decode messages with avro-encoded payload received from the Event Hub Consumer Client. + */ + +import { DefaultAzureCredential } from "@azure/identity"; +import { SchemaRegistryClient, SchemaDescription } from "@azure/schema-registry"; +import { SchemaRegistryAvroEncoder } from "@azure/schema-registry-avro"; +import { + EventHubConsumerClient, + earliestEventPosition, + createEventDataAdapter, +} from "@azure/event-hubs"; + +// Load the .env file if it exists +import * as dotenv from "dotenv"; +dotenv.config(); + +// The fully qualified namespace for schema registry +const schemaRegistryFullyQualifiedNamespace = + process.env["SCHEMA_REGISTRY_ENDPOINT"] || ""; + +// The schema group to use for schema registeration or lookup +const groupName = process.env["SCHEMA_REGISTRY_GROUP"] || "AzureSdkSampleGroup"; + +// The connection string for Event Hubs +const eventHubsConnectionString = process.env["EVENTHUB_CONNECTION_STRING"] || ""; + +// The name of Event Hub the client will connect to +const eventHubName = process.env["EVENTHUB_NAME"] || ""; + +// The name of the Event Hub consumer group from which you want to process events +const consumerGroup = process.env["CONSUMER_GROUP_NAME"] || ""; + +// Sample Avro Schema for user with first and last names +const schemaObject = { + type: "record", + name: "User", + namespace: "com.azure.schemaregistry.samples", + fields: [ + { + name: "firstName", + type: "string", + }, + { + name: "lastName", + type: "string", + }, + ], +}; + +const schema = JSON.stringify(schemaObject); + +// Description of the schema for registration +const schemaDescription: SchemaDescription = { + name: `${schemaObject.namespace}.${schemaObject.name}`, + groupName, + format: "Avro", + definition: schema, +}; + +export async function main() { + // Create a new client + const schemaRegistryClient = new SchemaRegistryClient( + schemaRegistryFullyQualifiedNamespace, + new DefaultAzureCredential() + ); + + // Register the schema. This would generally have been done somewhere else. + // You can also skip this step and let `encodeMessageData` automatically register + // schemas using autoRegisterSchemas=true, but that is NOT recommended in production. + await schemaRegistryClient.registerSchema(schemaDescription); + + // Create a new encoder backed by the client + const encoder = new SchemaRegistryAvroEncoder(schemaRegistryClient, { + groupName, + messageAdapter: createEventDataAdapter(), + }); + + const eventHubConsumerClient = new EventHubConsumerClient( + consumerGroup, + eventHubsConnectionString, + eventHubName + ); + + const subscription = eventHubConsumerClient.subscribe( + { + // The callback where you add your code to process incoming events + processEvents: async (events, context) => { + // Note: It is possible for `events` to be an empty array. + // This can happen if there were no new events to receive + // in the `maxWaitTimeInSeconds`, which is defaulted to + // 60 seconds. + // The `maxWaitTimeInSeconds` can be changed by setting + // it in the `options` passed to `subscribe()`. + for (const event of events) { + console.log( + `Received event: '${JSON.stringify(event)}' from partition: '${ + context.partitionId + }' and consumer group: '${context.consumerGroup}'` + ); + if (event.contentType !== undefined && event.body) { + const contentTypeParts = event.contentType.split("+"); + if (contentTypeParts[0] === "avro/binary") { + const decodedEvent = await encoder.decodeMessageData(event); + console.log(`Decoded message: '${JSON.stringify(decodedEvent)}'`); + } + } + } + }, + processError: async (err, context) => { + console.log(`Error on partition "${context.partitionId}": ${err}`); + }, + }, + { startPosition: earliestEventPosition } + ); + + // Wait for a bit before cleaning up the sample + setTimeout(async () => { + await subscription.close(); + await eventHubConsumerClient.close(); + console.log(`Exiting sample`); + }, 30 * 1000); +} + +main().catch((err) => { + console.error("The sample encountered an error:", err); +}); diff --git a/sdk/schemaregistry/schema-registry-avro/samples-dev/withEventHubsProducerClient.ts b/sdk/schemaregistry/schema-registry-avro/samples-dev/withEventHubsProducerClient.ts new file mode 100644 index 000000000000..37f183236d1d --- /dev/null +++ b/sdk/schemaregistry/schema-registry-avro/samples-dev/withEventHubsProducerClient.ts @@ -0,0 +1,175 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +/** + * @summary Demonstrates the use of SchemaRegistryAvroEncoder to create messages with avro-encoded payload using schema from Schema Registry and send them to an Event Hub using the EventHub Producer Client. + */ + +import { DefaultAzureCredential } from "@azure/identity"; +import { SchemaRegistryClient, SchemaDescription } from "@azure/schema-registry"; +import { SchemaRegistryAvroEncoder } from "@azure/schema-registry-avro"; +import { EventHubProducerClient, createEventDataAdapter } from "@azure/event-hubs"; + +// Load the .env file if it exists +import * as dotenv from "dotenv"; +dotenv.config(); + +// The fully qualified namespace for schema registry +const schemaRegistryFullyQualifiedNamespace = + process.env["SCHEMA_REGISTRY_ENDPOINT"] || ""; + +// The schema group to use for schema registeration or lookup +const groupName = process.env["SCHEMA_REGISTRY_GROUP"] || "AzureSdkSampleGroup"; + +// The connection string for Event Hubs +const eventHubsConnectionString = process.env["EVENTHUB_CONNECTION_STRING"] || ""; + +// The name of Event Hub the client will connect to +const eventHubName = process.env["EVENTHUB_NAME"] || ""; + +// Sample Avro Schema for user with first and last names +const schemaObject = { + type: "record", + name: "User", + namespace: "com.azure.schemaregistry.samples", + fields: [ + { + name: "firstName", + type: "string", + }, + { + name: "lastName", + type: "string", + }, + ], +}; + +// Matching TypeScript interface for schema +interface User { + firstName: string; + lastName: string; +} + +const schema = JSON.stringify(schemaObject); + +// Description of the schema for registration +const schemaDescription: SchemaDescription = { + name: `${schemaObject.namespace}.${schemaObject.name}`, + groupName, + format: "Avro", + definition: schema, +}; + +export async function main() { + // Create a new client + const schemaRegistryClient = new SchemaRegistryClient( + schemaRegistryFullyQualifiedNamespace, + new DefaultAzureCredential() + ); + + // Register the schema. This would generally have been done somewhere else. + // You can also skip this step and let `encodeMessageData` automatically register + // schemas using autoRegisterSchemas=true, but that is NOT recommended in production. + await schemaRegistryClient.registerSchema(schemaDescription); + + // Create a new encoder backed by the client + const encoder = new SchemaRegistryAvroEncoder(schemaRegistryClient, { + groupName, + messageAdapter: createEventDataAdapter(), + }); + + const eventHubsProducerClient = new EventHubProducerClient( + eventHubsConnectionString, + eventHubName + ); + + // encode an object that matches the schema + const value: User = { firstName: "Joe", lastName: "Doe" }; + const message = await encoder.encodeMessageData(value, schema); + console.log("Created message:"); + console.log(message); + + const eventsToSend = [message]; + + // By not specifying a partition ID or a partition key we allow the server to choose + // which partition will accept this message. + // + // This pattern works well if the consumers of your events do not have any particular + // requirements about the ordering of batches against other batches or if you don't care + // which messages are assigned to which partition. + // + // If you would like more control you can pass either a `partitionKey` or a `partitionId` + // into the createBatch() `options` parameter which will allow you full control over the + // destination. + const batchOptions = { + // The maxSizeInBytes lets you manually control the size of the batch. + // if this is not set we will get the maximum batch size from Event Hubs. + // + // For this sample you can change the batch size to see how different parts + // of the sample handle batching. In production we recommend using the default + // and not specifying a maximum size. + // + // maxSizeInBytes: 200 + }; + + let batch = await eventHubsProducerClient.createBatch(batchOptions); + + let numEventsSent = 0; + + // add events to our batch + let i = 0; + + while (i < eventsToSend.length) { + // messages can fail to be added to the batch if they exceed the maximum size configured for + // the EventHub. + const isAdded = batch.tryAdd(eventsToSend[i]); + + if (isAdded) { + console.log(`Added a message with index ${i} to the batch`); + ++i; + continue; + } + + if (batch.count === 0) { + // If we can't add it and the batch is empty that means the message we're trying to send + // is too large, even when it would be the _only_ message in the batch. + // + // At this point you'll need to decide if you're okay with skipping this message entirely + // or find some way to shrink it. + console.log(`Message was too large and can't be sent until it's made smaller. Skipping...`); + ++i; + continue; + } + + // otherwise this just signals a good spot to send our batch + console.log(`Batch is full - sending ${batch.count} messages as a single batch.`); + await eventHubsProducerClient.sendBatch(batch); + numEventsSent += batch.count; + + // and create a new one to house the next set of messages + batch = await eventHubsProducerClient.createBatch(batchOptions); + } + + // send any remaining messages, if any. + if (batch.count > 0) { + console.log(`Sending remaining ${batch.count} messages as a single batch.`); + await eventHubsProducerClient.sendBatch(batch); + numEventsSent += batch.count; + } + + console.log(`Sent ${numEventsSent} events`); + + if (numEventsSent !== eventsToSend.length) { + throw new Error(`Not all messages were sent (${numEventsSent}/${eventsToSend.length})`); + } + + // Wait for a bit before cleaning up the sample + setTimeout(async () => { + await eventHubsProducerClient.close(); + console.log(`Exiting sample`); + }, 3 * 1000); +} + +main().catch((err) => { + console.error("The sample encountered an error:", err); +}); diff --git a/sdk/schemaregistry/schema-registry-avro/src/index.ts b/sdk/schemaregistry/schema-registry-avro/src/index.ts index 54cefa3b5d8e..5c6530a707e3 100644 --- a/sdk/schemaregistry/schema-registry-avro/src/index.ts +++ b/sdk/schemaregistry/schema-registry-avro/src/index.ts @@ -1,7 +1,6 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -export { - SchemaRegistryAvroSerializer, - SchemaRegistryAvroSerializerOptions, -} from "./schemaRegistryAvroSerializer"; +export { SchemaRegistryAvroEncoder } from "./schemaRegistryAvroEncoder"; + +export * from "./models"; diff --git a/sdk/schemaregistry/schema-registry-avro/src/models.ts b/sdk/schemaregistry/schema-registry-avro/src/models.ts new file mode 100644 index 000000000000..1482e7ef92df --- /dev/null +++ b/sdk/schemaregistry/schema-registry-avro/src/models.ts @@ -0,0 +1,63 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +/** + * A message that contains binary data and a content type. + */ +export interface MessageWithMetadata { + /** + * The message's binary data + */ + body: Uint8Array; + /** + * The message's content type + */ + contentType: string; +} + +/** + * MessageAdapter is an interface that converts to/from a concrete message type + * to a message with metadata + */ +export interface MessageAdapter { + /** + * defines how to create a message from a payload and a content type + */ + produceMessage: (messageWithMetadata: MessageWithMetadata) => MessageT; + /** + * defines how to access the payload and the content type of a message + */ + consumeMessage: (message: MessageT) => MessageWithMetadata; +} + +/** + * Options for Schema + */ +export interface SchemaRegistryAvroEncoderOptions { + /** + * When true, register new schemas passed to encodeMessageData. Otherwise, and by + * default, fail if schema has not already been registered. + * + * Automatic schema registration is NOT recommended for production scenarios. + */ + autoRegisterSchemas?: boolean; + /** + * The group name to be used when registering/looking up a schema. Must be specified + * if `encodeMessageData` will be called. + */ + groupName?: string; + /** + * Message Adapter enables the encoder to produce and consume custom messages. + */ + messageAdapter?: MessageAdapter; +} + +/** + * The options to the decodeMessageData method. + */ +export interface DecodeMessageDataOptions { + /** + * The schema to be used for decoding. + */ + schema?: string; +} diff --git a/sdk/schemaregistry/schema-registry-avro/src/schemaRegistryAvroEncoder.ts b/sdk/schemaregistry/schema-registry-avro/src/schemaRegistryAvroEncoder.ts new file mode 100644 index 000000000000..1ac215029a63 --- /dev/null +++ b/sdk/schemaregistry/schema-registry-avro/src/schemaRegistryAvroEncoder.ts @@ -0,0 +1,218 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import * as avro from "avsc"; +import { + DecodeMessageDataOptions, + MessageAdapter, + MessageWithMetadata, + SchemaRegistryAvroEncoderOptions, +} from "./models"; +import { SchemaDescription, SchemaRegistry } from "@azure/schema-registry"; +import { isMessageWithMetadata } from "./utility"; + +interface CacheEntry { + /** Schema ID */ + id: string; + + /** avsc-specific representation for schema */ + type: avro.Type; +} + +const avroMimeType = "avro/binary"; + +/** + * Avro encoder that obtains schemas from a schema registry and does not + * pack schemas into its payloads. + */ +export class SchemaRegistryAvroEncoder { + /** + * Creates a new encoder. + * + * @param client - Schema Registry where schemas are registered and obtained. + * Usually this is a SchemaRegistryClient instance. + */ + constructor(client: SchemaRegistry, options?: SchemaRegistryAvroEncoderOptions) { + this.registry = client; + this.schemaGroup = options?.groupName; + this.autoRegisterSchemas = options?.autoRegisterSchemas ?? false; + this.messageAdapter = options?.messageAdapter; + } + + private readonly schemaGroup?: string; + private readonly registry: SchemaRegistry; + private readonly autoRegisterSchemas: boolean; + private readonly messageAdapter?: MessageAdapter; + + // REVIEW: signature. + // + // - Should we wrap all errors thrown by avsc to avoid having our exception // + // contract being tied to its implementation details? + /** + * encodes the value parameter according to the input schema and creates a message + * with the encoded data. + * + * @param value - The value to encodeMessageData. + * @param schema - The Avro schema to use. + * @returns A new message with the encoded value. The structure of message is + * constrolled by the message factory option. + */ + async encodeMessageData(value: unknown, schema: string): Promise { + const entry = await this.getSchemaByDefinition(schema); + const buffer = entry.type.toBuffer(value); + const payload = new Uint8Array( + buffer.buffer, + buffer.byteOffset, + buffer.byteLength / Uint8Array.BYTES_PER_ELEMENT + ); + const contentType = `${avroMimeType}+${entry.id}`; + return this.messageAdapter + ? this.messageAdapter.produceMessage({ + contentType, + body: payload, + }) + : /** + * If no message consumer was provided, then a MessageWithMetadata will be + * returned. This should work because the MessageT type parameter defaults + * to MessageWithMetadata. + */ + ({ + body: payload, + contentType: contentType, + } as unknown as MessageT); + } + + /** + * Decodes the payload of the message using the schema ID in the content type + * field if no schema was provided. + * + * @param message - The message with the payload to be decoded. + * @param options - Decoding options. + * @returns The decoded value. + */ + async decodeMessageData( + message: MessageT, + options: DecodeMessageDataOptions = {} + ): Promise { + const { schema: readerSchema } = options; + const { body, contentType } = getPayloadAndContent(message, this.messageAdapter); + const buffer = Buffer.from(body); + const writerSchemaId = getSchemaId(contentType); + const writerSchema = await this.getSchema(writerSchemaId); + if (readerSchema) { + const avscReaderSchema = this.getAvroTypeForSchema(readerSchema); + const resolver = avscReaderSchema.createResolver(writerSchema.type); + return avscReaderSchema.fromBuffer(buffer, resolver, true); + } else { + return writerSchema.type.fromBuffer(buffer); + } + } + + private readonly cacheBySchemaDefinition = new Map(); + private readonly cacheById = new Map(); + + private async getSchema(schemaId: string): Promise { + const cached = this.cacheById.get(schemaId); + if (cached) { + return cached; + } + + const schemaResponse = await this.registry.getSchema(schemaId); + if (!schemaResponse) { + throw new Error(`Schema with ID '${schemaId}' not found.`); + } + + if (!schemaResponse.properties.format.match(/^avro$/i)) { + throw new Error( + `Schema with ID '${schemaResponse.properties.id}' has format '${schemaResponse.properties.format}', not 'avro'.` + ); + } + + const avroType = this.getAvroTypeForSchema(schemaResponse.definition); + return this.cache(schemaId, schemaResponse.definition, avroType); + } + + private async getSchemaByDefinition(schema: string): Promise { + const cached = this.cacheBySchemaDefinition.get(schema); + if (cached) { + return cached; + } + + const avroType = this.getAvroTypeForSchema(schema); + if (!avroType.name) { + throw new Error("Schema must have a name."); + } + + if (!this.schemaGroup) { + throw new Error( + "Schema group must have been specified in the constructor options when the client was created in order to encode." + ); + } + + const description: SchemaDescription = { + groupName: this.schemaGroup, + name: avroType.name, + format: "Avro", + definition: schema, + }; + + let id: string; + if (this.autoRegisterSchemas) { + id = (await this.registry.registerSchema(description)).id; + } else { + try { + id = (await this.registry.getSchemaProperties(description)).id; + } catch (e) { + if (e.statusCode === 404) { + throw new Error( + `Schema '${description.name}' not found in registry group '${description.groupName}', or not found to have matching definition.` + ); + } else { + throw e; + } + } + } + + return this.cache(id, schema, avroType); + } + + private cache(id: string, schema: string, type: avro.Type): CacheEntry { + const entry = { id, type }; + this.cacheBySchemaDefinition.set(schema, entry); + this.cacheById.set(id, entry); + return entry; + } + + private getAvroTypeForSchema(schema: string): avro.Type { + return avro.Type.forSchema(JSON.parse(schema), { omitRecordMethods: true }); + } +} + +function getSchemaId(contentType: string): string { + const contentTypeParts = contentType.split("+"); + if (contentTypeParts.length !== 2) { + throw new Error("Content type was not in the expected format of MIME type + schema ID"); + } + if (contentTypeParts[0] !== avroMimeType) { + throw new Error( + `Received content of type ${contentTypeParts[0]} but an avro encoder may only be used on content that is of '${avroMimeType}' type` + ); + } + return contentTypeParts[1]; +} + +function getPayloadAndContent( + message: MessageT, + messageAdapter?: MessageAdapter +): MessageWithMetadata { + const messageConsumer = messageAdapter?.consumeMessage; + if (messageConsumer) { + return messageConsumer(message); + } else if (isMessageWithMetadata(message)) { + return message; + } else { + throw new Error( + `Either the messageConsumer option should be defined or the message should have body and contentType fields` + ); + } +} diff --git a/sdk/schemaregistry/schema-registry-avro/src/schemaRegistryAvroSerializer.ts b/sdk/schemaregistry/schema-registry-avro/src/schemaRegistryAvroSerializer.ts deleted file mode 100644 index 02af75187e13..000000000000 --- a/sdk/schemaregistry/schema-registry-avro/src/schemaRegistryAvroSerializer.ts +++ /dev/null @@ -1,234 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT license. - -import { SchemaDescription, SchemaRegistry } from "@azure/schema-registry"; -import * as avro from "avsc"; -import { toUint8Array } from "./utils/buffer"; - -// REVIEW: This should go in to a shared doc somewhere that all of the different -// language serializer's docs can reference. -// -// Wire format -// ----------- -// -// This is a standard meant to be reused across schema registry serializers for -// different format. We only have an avro serializer at this point so picking -// apart this format is inlined here, but handling could be extracted and shared -// between serializers of different formats in the future. -// -// - [4 bytes: Format Indicator] -// - Currently always zero to indicate format below. -// -// - [32 bytes: Schema ID] -// - UTF-8 hexadecimal representation of GUID. -// - 32 hex digits, no hyphens. -// - Same format and byte order as string from Schema Registry service. -// -// - [Remaining bytes: Avro payload (in general, format-specific payload)] -// - Avro Binary Encoding -// - NOT Avro Object Container File, which includes the schema and defeats -// the purpose of this serializer to move the schema out of the message -// payload and into the schema registry. -// -const FORMAT_INDICATOR = 0; -const SCHEMA_ID_OFFSET = 4; -const SCHEMA_ID_LENGTH = 32; -const PAYLOAD_OFFSET = 36; - -interface CacheEntry { - /** Schema ID */ - id: string; - - /** avsc-specific representation for schema */ - type: avro.Type; -} - -/** - * Options for Schema - */ -export interface SchemaRegistryAvroSerializerOptions { - /** - * When true, register new schemas passed to serialize. Otherwise, and by - * default, fail if schema has not already been registered. - * - * Automatic schema registration is NOT recommended for production scenarios. - */ - autoRegisterSchemas?: boolean; - /** - * The group name to be used when registering/looking up a schema. Must be specified - * if you will be calling `serialize`. - */ - groupName?: string; -} - -/** - * Avro serializer that obtains schemas from a schema registry and does not - * pack schemas into its payloads. - */ -export class SchemaRegistryAvroSerializer { - /** - * Creates a new serializer. - * - * @param client - Schema Registry where schemas are registered and obtained. - * Usually this is a SchemaRegistryClient instance. - */ - constructor(client: SchemaRegistry, options?: SchemaRegistryAvroSerializerOptions) { - this.registry = client; - this.schemaGroup = options?.groupName; - this.autoRegisterSchemas = options?.autoRegisterSchemas ?? false; - } - - private readonly schemaGroup?: string; - private readonly registry: SchemaRegistry; - private readonly autoRegisterSchemas: boolean; - - // REVIEW: signature. - // - // - Better to serialize into a stream? I aborted that for now as I wanted to - // do the simplest thing that could possibly work first to make sure there - // were no blockers in our dependencies. I also wanted to get feedback on - // what the API shape should be before diving into that. - // - // - This type should ultimately be able to implement a core ObjectSerializer - // interface. Do we know what that would look like? Maybe it takes `any` as - // the format-specific schema/type info? Or does it always take a - // format-specific schema string? - // - // The C#/Java approach of passing Type and assuming every serializer can - // get its schema by reflecting on the type does not work for JavaScript. We - // need to support arbitrary objects that match a schema. - // - // Maybe each format expects a different property on this arg so that you - // could at least pass enough info for multiple formats, and then your call - // to ObjectSerializer is at least not tied to a single format? - // - // - Should we wrap all errors thrown by avsc to avoid having our exception // - // contract being tied to its implementation details? - /** - * Serializes a value into a buffer. - * - * @param value - The value to serialize. - * @param schema - The Avro schema to use. - * @returns A new buffer with the serialized value - */ - async serialize(value: unknown, schema: string): Promise { - const entry = await this.getSchemaByDefinition(schema); - const payload = entry.type.toBuffer(value); - const buffer = Buffer.alloc(PAYLOAD_OFFSET + payload.length); - - buffer.writeUInt32BE(FORMAT_INDICATOR, 0); - buffer.write(entry.id, SCHEMA_ID_OFFSET, SCHEMA_ID_LENGTH, "utf-8"); - payload.copy(buffer, PAYLOAD_OFFSET); - return new Uint8Array( - buffer.buffer, - buffer.byteOffset, - buffer.byteLength / Uint8Array.BYTES_PER_ELEMENT - ); - } - - // REVIEW: signature. See serialize and s/serialize into/deserialize from/. - /** - * Deserializes a value from a buffer. - * - * @param buffer - The buffer with the serialized value. - * @returns The deserialized value. - */ - async deserialize(input: Buffer | Blob | Uint8Array): Promise { - const arr8 = await toUint8Array(input); - const buffer = Buffer.isBuffer(arr8) ? arr8 : Buffer.from(arr8); - if (buffer.length < PAYLOAD_OFFSET) { - throw new RangeError("Buffer is too small to have the correct format."); - } - - const format = buffer.readUInt32BE(0); - if (format !== FORMAT_INDICATOR) { - throw new TypeError(`Buffer has unknown format indicator: 0x${format.toString(16)}.`); - } - - const schemaIdBuffer = buffer.slice(SCHEMA_ID_OFFSET, PAYLOAD_OFFSET); - const schemaId = schemaIdBuffer.toString("utf-8"); - const schema = await this.getSchema(schemaId); - const payloadBuffer = buffer.slice(PAYLOAD_OFFSET); - - return schema.type.fromBuffer(payloadBuffer); - } - - private readonly cacheBySchemaDefinition = new Map(); - private readonly cacheById = new Map(); - - private async getSchema(schemaId: string): Promise { - const cached = this.cacheById.get(schemaId); - if (cached) { - return cached; - } - - const schemaResponse = await this.registry.getSchema(schemaId); - if (!schemaResponse) { - throw new Error(`Schema with ID '${schemaId}' not found.`); - } - - if (!schemaResponse.properties.format.match(/^avro$/i)) { - throw new Error( - `Schema with ID '${schemaResponse.properties.id}' has format '${schemaResponse.properties.format}', not 'avro'.` - ); - } - - const avroType = this.getAvroTypeForSchema(schemaResponse.definition); - return this.cache(schemaId, schemaResponse.definition, avroType); - } - - private async getSchemaByDefinition(schema: string): Promise { - const cached = this.cacheBySchemaDefinition.get(schema); - if (cached) { - return cached; - } - - const avroType = this.getAvroTypeForSchema(schema); - if (!avroType.name) { - throw new Error("Schema must have a name."); - } - - if (!this.schemaGroup) { - throw new Error( - "Schema group must have been specified in the constructor options when the client was created in order to serialize." - ); - } - - const description: SchemaDescription = { - groupName: this.schemaGroup, - name: avroType.name, - format: "Avro", - definition: schema, - }; - - let id: string; - if (this.autoRegisterSchemas) { - id = (await this.registry.registerSchema(description)).id; - } else { - try { - id = (await this.registry.getSchemaProperties(description)).id; - } catch (e) { - if (e.statusCode === 404) { - throw new Error( - `Schema '${description.name}' not found in registry group '${description.groupName}', or not found to have matching definition.` - ); - } else { - throw e; - } - } - } - - return this.cache(id, schema, avroType); - } - - private cache(id: string, schema: string, type: avro.Type): CacheEntry { - const entry = { id, type }; - this.cacheBySchemaDefinition.set(schema, entry); - this.cacheById.set(id, entry); - return entry; - } - - private getAvroTypeForSchema(schema: string): avro.Type { - return avro.Type.forSchema(JSON.parse(schema), { omitRecordMethods: true }); - } -} diff --git a/sdk/schemaregistry/schema-registry-avro/src/utility.ts b/sdk/schemaregistry/schema-registry-avro/src/utility.ts new file mode 100644 index 000000000000..9578eeb39b6b --- /dev/null +++ b/sdk/schemaregistry/schema-registry-avro/src/utility.ts @@ -0,0 +1,9 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { MessageWithMetadata } from "./models"; + +export function isMessageWithMetadata(message: unknown): message is MessageWithMetadata { + const castMessage = message as MessageWithMetadata; + return castMessage.body !== undefined && castMessage.contentType !== undefined; +} diff --git a/sdk/schemaregistry/schema-registry-avro/src/utils/buffer.browser.ts b/sdk/schemaregistry/schema-registry-avro/src/utils/buffer.browser.ts deleted file mode 100644 index 9d1a069df24f..000000000000 --- a/sdk/schemaregistry/schema-registry-avro/src/utils/buffer.browser.ts +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT license. - -async function blobToArrayBuffer(blob: Blob): Promise { - if ("arrayBuffer" in blob) { - return blob.arrayBuffer(); - } - return new Promise((resolve, reject) => { - const reader = new FileReader(); - reader.onload = () => resolve(reader.result as ArrayBuffer); - reader.onerror = () => reject; - reader.readAsArrayBuffer(blob); - }); -} - -/** - * @param input - Input to `deserialize`. - * @returns Promise which completes with the input data as a Uint8Array. - */ -export async function toUint8Array(input: Uint8Array | Buffer | Blob): Promise { - // If this is not a Uint8Array, assume it's a blob and retrieve an ArrayBuffer from the blob. - if ((input as any).byteLength === undefined) { - return new Uint8Array(await blobToArrayBuffer(input as Blob)); - } - return input as Uint8Array; -} diff --git a/sdk/schemaregistry/schema-registry-avro/src/utils/buffer.ts b/sdk/schemaregistry/schema-registry-avro/src/utils/buffer.ts deleted file mode 100644 index f7a07c5d8693..000000000000 --- a/sdk/schemaregistry/schema-registry-avro/src/utils/buffer.ts +++ /dev/null @@ -1,13 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT license. - -/** - * @param input - Input to `deserialize`. - * @returns Promise which completes with the input data as a Uint8Array. - */ -export async function toUint8Array(input: Uint8Array | Buffer | Blob): Promise { - if ((input as any).byteLength === undefined) { - throw TypeError("Blob is unsupported in node."); - } - return input as Uint8Array; -} diff --git a/sdk/schemaregistry/schema-registry-avro/test/messageAdapter.spec.ts b/sdk/schemaregistry/schema-registry-avro/test/messageAdapter.spec.ts new file mode 100644 index 000000000000..b24dcec87a81 --- /dev/null +++ b/sdk/schemaregistry/schema-registry-avro/test/messageAdapter.spec.ts @@ -0,0 +1,84 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { EventData, createEventDataAdapter, EventDataAdapterParameters } from "@azure/event-hubs"; +import { matrix } from "@azure/test-utils"; +import { MessageAdapter } from "../src/models"; +import { assert } from "chai"; + +/** + * A type predicate to check whether two record types have the same keys + */ +type AssertEqualKeys, T2 extends Record> = [ + keyof T1 extends keyof T2 ? 1 : 0, + keyof T2 extends keyof T1 ? 1 : 0 +] extends [1, 1] + ? true + : false; + +function isMessageAdapter(obj: any): obj is MessageAdapter { + return typeof obj.produceMessage === "function" && typeof obj.consumeMessage === "function"; +} + +/** + * some tests consume messages with well-formed Uint8Array payload so this variable + * is used to construct those. + */ +const dummyUint8Array = Uint8Array.from([0]); + +/** + * An interface to group different bits needed by the tests for each adapter + * factory + */ +interface AdapterTestInfo { + adapterFactory: MessageAdapter; + nonUint8ArrayMessage: T; + adapterFactoryName: string; +} + +const eventDataAdapterTestInfo: AdapterTestInfo = { + adapterFactory: createEventDataAdapter(), + nonUint8ArrayMessage: { + body: "", + contentType: "", + }, + adapterFactoryName: createEventDataAdapter.name, +}; + +describe("Message Adapters", function () { + describe("Input types for message adapter factories are sound", function () { + it("EventDataAdapterParameters", function () { + const areEqual: AssertEqualKeys< + EventDataAdapterParameters, + Omit + > = true; + assert.isTrue( + areEqual, + 'EventDataAdapterParameters should have the same shape as Omit.' + ); + }); + }); + matrix([[eventDataAdapterTestInfo]] as const, async (adapterTestInfo: AdapterTestInfo) => { + describe(adapterTestInfo.adapterFactoryName, function () { + const adapter = adapterTestInfo.adapterFactory; + it("implements MessageAdapter", async () => { + assert.isTrue(isMessageAdapter(adapter), `should create a valid MessageAdapter`); + }); + it("consumeMessage rejects non-Uint8Array body", async () => { + assert.throws( + () => adapter.consumeMessage(adapterTestInfo.nonUint8ArrayMessage), + /Expected the body field to be defined and have a Uint8Array/ + ); + }); + it("consumeMessage rejects messages with no contentType", async () => { + assert.throws( + () => + adapter.consumeMessage({ + body: dummyUint8Array, + }), + /Expected the contentType field to be defined/ + ); + }); + }); + }); +}); diff --git a/sdk/schemaregistry/schema-registry-avro/test/schemaRegistryAvroEncoder.spec.ts b/sdk/schemaregistry/schema-registry-avro/test/schemaRegistryAvroEncoder.spec.ts new file mode 100644 index 000000000000..4deb210f1278 --- /dev/null +++ b/sdk/schemaregistry/schema-registry-avro/test/schemaRegistryAvroEncoder.spec.ts @@ -0,0 +1,192 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +import { assert, use as chaiUse } from "chai"; +import { createTestEncoder, registerTestSchema } from "./utils/mockedEncoder"; +import { testAvroType, testGroup, testSchema, testSchemaIds, testValue } from "./utils/dummies"; +import chaiPromises from "chai-as-promised"; +import { createTestRegistry } from "./utils/mockedRegistryClient"; + +chaiUse(chaiPromises); + +describe("SchemaRegistryAvroEncoder", function () { + it("rejects invalid format", async () => { + const encoder = await createTestEncoder(); + await assert.isRejected( + encoder.decodeMessageData({ + body: Buffer.alloc(1), + contentType: "application/json+1234", + }), + /application\/json.*avro\/binary/ + ); + }); + + it("rejects schema with no name", async () => { + const encoder = await createTestEncoder(); + const schema = JSON.stringify({ type: "record", fields: [] }); + await assert.isRejected(encoder.encodeMessageData({}, schema), /name/); + }); + + it("rejects a schema with different format", async () => { + const registry = createTestRegistry(true); // true means never live, we can't register non-avro schema in live service + const encoder = await createTestEncoder(false, registry); + const schema = await registry.registerSchema({ + name: "_", + definition: "_", + format: "NotAvro", + groupName: testGroup, + }); + + await assert.isRejected( + encoder.decodeMessageData({ + body: Buffer.alloc(1), + contentType: `avro/binary+${schema.id}`, + }), + new RegExp(`${schema.id}.*NotAvro.*avro`) + ); + }); + + it("rejects encoding when schema is not found", async () => { + const encoder = await createTestEncoder(false); + const schema = JSON.stringify({ + type: "record", + name: "NeverRegistered", + namespace: "my.example", + fields: [{ name: "count", type: "int" }], + }); + await assert.isRejected(encoder.encodeMessageData({ count: 42 }, schema), /not found/); + }); + + it("rejects decoding when schema is not found", async () => { + const encoder = await createTestEncoder(false); + const payload = testAvroType.toBuffer(testValue); + await assert.isRejected( + encoder.decodeMessageData({ + body: payload, + contentType: `avro/binary+${testSchemaIds[1]}`, + }), + /not found/ + ); + }); + + it("encodes to the expected format", async () => { + const registry = createTestRegistry(); + const schemaId = await registerTestSchema(registry); + const encoder = await createTestEncoder(false, registry); + const message = await encoder.encodeMessageData(testValue, testSchema); + assert.isUndefined((message.body as Buffer).readBigInt64BE); + const buffer = Buffer.from(message.body); + assert.strictEqual(`avro/binary+${schemaId}`, message.contentType); + assert.deepStrictEqual(testAvroType.fromBuffer(buffer), testValue); + }); + + it("decodes from the expected format", async () => { + const registry = createTestRegistry(); + const schemaId = await registerTestSchema(registry); + const encoder = await createTestEncoder(false, registry); + const payload = testAvroType.toBuffer(testValue); + assert.deepStrictEqual( + await encoder.decodeMessageData({ + body: payload, + contentType: `avro/binary+${schemaId}`, + }), + testValue + ); + }); + + it("encodes and decodes in round trip", async () => { + let encoder = await createTestEncoder(); + let message = await encoder.encodeMessageData(testValue, testSchema); + assert.deepStrictEqual(await encoder.decodeMessageData(message), testValue); + + // again for cache hit coverage on encodeMessageData + message = await encoder.encodeMessageData(testValue, testSchema); + assert.deepStrictEqual(await encoder.decodeMessageData(message), testValue); + + // throw away encoder for cache miss coverage on decodeMessageData + encoder = await createTestEncoder(false); + assert.deepStrictEqual(await encoder.decodeMessageData(message), testValue); + + // throw away encoder again and cover getSchemaProperties instead of registerSchema + encoder = await createTestEncoder(false); + assert.deepStrictEqual(await encoder.encodeMessageData(testValue, testSchema), message); + }); + + it("works with trivial example in README", async () => { + const encoder = await createTestEncoder(); + + // Example Avro schema + const schema = JSON.stringify({ + type: "record", + name: "Rating", + namespace: "my.example", + fields: [{ name: "score", type: "int" }], + }); + + // Example value that matches the Avro schema above + const value = { score: 42 }; + + // encode value to a message + const message = await encoder.encodeMessageData(value, schema); + + // Decode message to value + const decodedValue = await encoder.decodeMessageData(message); + + assert.deepStrictEqual(decodedValue, value); + }); + + it("decodes from a compatible reader schema", async () => { + const encoder = await createTestEncoder(); + const message = await encoder.encodeMessageData(testValue, testSchema); + const decodedValue: any = await encoder.decodeMessageData(message, { + /** + * This schema is missing the favoriteNumber field that exists in the writer schema + * and adds an "age" field with a default value. + */ + schema: JSON.stringify({ + type: "record", + name: "AvroUser", + namespace: "com.azure.schemaregistry.samples", + fields: [ + { + name: "name", + type: "string", + }, + { + name: "age", + type: "int", + default: 30, + }, + ], + }), + }); + assert.isUndefined(decodedValue.favoriteNumber); + assert.equal(decodedValue.name, testValue.name); + assert.equal(decodedValue.age, 30); + }); + + it("fails to decode from an incompatible reader schema", async () => { + const encoder = await createTestEncoder(); + const message = await encoder.encodeMessageData(testValue, testSchema); + assert.isRejected( + encoder.decodeMessageData(message, { + schema: JSON.stringify({ + type: "record", + name: "AvroUser", + namespace: "com.azure.schemaregistry.samples", + fields: [ + { + name: "name", + type: "string", + }, + { + name: "age", + type: "int", + }, + ], + }), + }), + /no matching field for default-less com.azure.schemaregistry.samples.AvroUser.age/ + ); + }); +}); diff --git a/sdk/schemaregistry/schema-registry-avro/test/schemaRegistryAvroSerializer.spec.ts b/sdk/schemaregistry/schema-registry-avro/test/schemaRegistryAvroSerializer.spec.ts deleted file mode 100644 index 2f9082d7d632..000000000000 --- a/sdk/schemaregistry/schema-registry-avro/test/schemaRegistryAvroSerializer.spec.ts +++ /dev/null @@ -1,134 +0,0 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT license. - -import { assert, use as chaiUse } from "chai"; -import chaiPromises from "chai-as-promised"; -import { testAvroType, testGroup, testSchema, testSchemaIds, testValue } from "./utils/dummies"; -import { createTestRegistry } from "./utils/mockedRegistryClient"; -import { createTestSerializer, registerTestSchema } from "./utils/mockedSerializer"; - -chaiUse(chaiPromises); - -describe("SchemaRegistryAvroSerializer", function () { - it("rejects buffers that are too small", async () => { - const serializer = await createTestSerializer(); - await assert.isRejected(serializer.deserialize(Buffer.alloc(3)), /small/); - }); - - it("rejects invalid format", async () => { - const serializer = await createTestSerializer(); - const buffer = Buffer.alloc(42); - buffer.writeUInt32BE(0x1234, 0); - await assert.isRejected(serializer.deserialize(buffer), /format.*0x1234/); - }); - - it("rejects schema with no name", async () => { - const serializer = await createTestSerializer(); - const schema = JSON.stringify({ type: "record", fields: [] }); - await assert.isRejected(serializer.serialize({}, schema), /name/); - }); - - it("rejects a schema with different format", async () => { - const registry = createTestRegistry(true); // true means never live, we can't register non-avro schema in live service - const serializer = await createTestSerializer(false, registry); - const schema = await registry.registerSchema({ - name: "_", - definition: "_", - format: "NotAvro", - groupName: testGroup, - }); - - const buffer = Buffer.alloc(36); - buffer.write(schema.id, 4, 32, "utf-8"); - await assert.isRejected( - serializer.deserialize(buffer), - new RegExp(`${schema.id}.*NotAvro.*avro`) - ); - }); - - it("rejects serialization when schema is not found", async () => { - const serializer = await createTestSerializer(false); - const schema = JSON.stringify({ - type: "record", - name: "NeverRegistered", - namespace: "my.example", - fields: [{ name: "count", type: "int" }], - }); - await assert.isRejected(serializer.serialize({ count: 42 }, schema), /not found/); - }); - - it("rejects deserialization when schema is not found", async () => { - const serializer = await createTestSerializer(false); - const payload = testAvroType.toBuffer(testValue); - const buffer = Buffer.alloc(36 + payload.length); - buffer.write(testSchemaIds[1], 4, 32, "utf-8"); - payload.copy(buffer, 36); - await assert.isRejected(serializer.deserialize(buffer), /not found/); - }); - - it("serializes to the expected format", async () => { - const registry = createTestRegistry(); - const schemaId = await registerTestSchema(registry); - const serializer = await createTestSerializer(false, registry); - const arr = await serializer.serialize(testValue, testSchema); - assert.isUndefined((arr as Buffer).readBigInt64BE); - const buffer = Buffer.from(arr); - assert.strictEqual(0x0, buffer.readUInt32BE(0)); - assert.strictEqual(schemaId, buffer.toString("utf-8", 4, 36)); - const payload = buffer.slice(36); - assert.deepStrictEqual(testAvroType.fromBuffer(payload), testValue); - }); - - it("deserializes from the expected format", async () => { - const registry = createTestRegistry(); - const schemaId = await registerTestSchema(registry); - const serializer = await createTestSerializer(false, registry); - const payload = testAvroType.toBuffer(testValue); - const buffer = Buffer.alloc(36 + payload.length); - - buffer.write(schemaId, 4, 32, "utf-8"); - payload.copy(buffer, 36); - assert.deepStrictEqual(await serializer.deserialize(buffer), testValue); - }); - - it("serializes and deserializes in round trip", async () => { - let serializer = await createTestSerializer(); - let buffer = await serializer.serialize(testValue, testSchema); - assert.deepStrictEqual(await serializer.deserialize(buffer), testValue); - - // again for cache hit coverage on serialize - buffer = await serializer.serialize(testValue, testSchema); - assert.deepStrictEqual(await serializer.deserialize(buffer), testValue); - - // throw away serializer for cache miss coverage on deserialize - serializer = await createTestSerializer(false); - assert.deepStrictEqual(await serializer.deserialize(buffer), testValue); - - // throw away serializer again and cover getSchemaProperties instead of registerSchema - serializer = await createTestSerializer(false); - assert.deepStrictEqual(await serializer.serialize(testValue, testSchema), buffer); - }); - - it("works with trivial example in README", async () => { - const serializer = await createTestSerializer(); - - // Example Avro schema - const schema = JSON.stringify({ - type: "record", - name: "Rating", - namespace: "my.example", - fields: [{ name: "score", type: "int" }], - }); - - // Example value that matches the Avro schema above - const value = { score: 42 }; - - // Serialize value to buffer - const buffer = await serializer.serialize(value, schema); - - // Deserialize buffer to value - const deserializedValue = await serializer.deserialize(buffer); - - assert.deepStrictEqual(deserializedValue, value); - }); -}); diff --git a/sdk/schemaregistry/schema-registry-avro/test/utils/mockedSerializer.ts b/sdk/schemaregistry/schema-registry-avro/test/utils/mockedEncoder.ts similarity index 72% rename from sdk/schemaregistry/schema-registry-avro/test/utils/mockedSerializer.ts rename to sdk/schemaregistry/schema-registry-avro/test/utils/mockedEncoder.ts index 709804cd1527..68f4a9d29944 100644 --- a/sdk/schemaregistry/schema-registry-avro/test/utils/mockedSerializer.ts +++ b/sdk/schemaregistry/schema-registry-avro/test/utils/mockedEncoder.ts @@ -1,19 +1,19 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -import { SchemaRegistry } from "@azure/schema-registry"; -import { SchemaRegistryAvroSerializer } from "../../src/schemaRegistryAvroSerializer"; import { testGroup, testSchema, testSchemaObject } from "./dummies"; +import { SchemaRegistry } from "@azure/schema-registry"; +import { SchemaRegistryAvroEncoder } from "../../src/schemaRegistryAvroEncoder"; import { createTestRegistry } from "./mockedRegistryClient"; -export async function createTestSerializer( +export async function createTestEncoder( autoRegisterSchemas = true, registry = createTestRegistry() -): Promise { +): Promise { if (!autoRegisterSchemas) { await registerTestSchema(registry); } - return new SchemaRegistryAvroSerializer(registry, { autoRegisterSchemas, groupName: testGroup }); + return new SchemaRegistryAvroEncoder(registry, { autoRegisterSchemas, groupName: testGroup }); } export async function registerTestSchema(registry: SchemaRegistry): Promise { diff --git a/sdk/schemaregistry/schema-registry-avro/test/utils/mockedRegistryClient.ts b/sdk/schemaregistry/schema-registry-avro/test/utils/mockedRegistryClient.ts index 14002caf9b65..d57c0cb78119 100644 --- a/sdk/schemaregistry/schema-registry-avro/test/utils/mockedRegistryClient.ts +++ b/sdk/schemaregistry/schema-registry-avro/test/utils/mockedRegistryClient.ts @@ -11,8 +11,8 @@ import { SchemaRegistry, SchemaRegistryClient, } from "@azure/schema-registry"; -import { ClientSecretCredential } from "@azure/identity"; import { env, isLiveMode } from "@azure-tools/test-recorder"; +import { ClientSecretCredential } from "@azure/identity"; import { testSchemaIds } from "./dummies"; export function createTestRegistry(neverLive = false): SchemaRegistry {