Skip to content

Commit

Permalink
[Schema Registry Avro] Use LRU cache policy (#20108)
Browse files Browse the repository at this point in the history
* [Schema Registry Avro] Update cache policy

* renames

* use lru-cache

* add tests

* update changelog

* update test

* update test

* update semver for @types/lru-cache
  • Loading branch information
deyaaeldeen authored Jan 28, 2022
1 parent 55ad309 commit b88c0ba
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 33 deletions.
18 changes: 13 additions & 5 deletions common/config/rush/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions sdk/schemaregistry/schema-registry-avro/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
### Bugs Fixed

### Other Changes
- The internal cache has been updated to be an LRU one with a max entries count of 128

## 1.0.0-beta.5 (2021-11-17)

Expand Down
6 changes: 5 additions & 1 deletion sdk/schemaregistry/schema-registry-avro/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
"@azure/schema-registry": "1.0.2",
"avsc": "^5.5.1",
"buffer": "^6.0.0",
"lru-cache": "^6.0.0",
"tslib": "^2.2.0"
},
"devDependencies": {
Expand All @@ -89,8 +90,10 @@
"@rollup/plugin-replace": "^2.2.0",
"@types/chai": "^4.1.6",
"@types/chai-as-promised": "^7.1.0",
"@types/lru-cache": "^5.1.1",
"@types/mocha": "^7.0.2",
"@types/node": "^12.0.0",
"@types/uuid": "^8.3.0",
"chai": "^4.2.0",
"chai-as-promised": "^7.1.1",
"cross-env": "^7.0.2",
Expand Down Expand Up @@ -118,6 +121,7 @@
"rollup": "^1.16.3",
"rollup-plugin-shim": "^1.0.0",
"source-map-support": "^0.5.9",
"typescript": "~4.2.0"
"typescript": "~4.2.0",
"uuid": "^8.3.0"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// Licensed under the MIT license.

import * as avro from "avsc";
import LRUCache from "lru-cache";
import LRUCacheOptions = LRUCache.Options;
import {
DecodeMessageDataOptions,
MessageAdapter,
Expand All @@ -11,15 +13,20 @@ import {
import { SchemaDescription, SchemaRegistry } from "@azure/schema-registry";
import { isMessageWithMetadata } from "./utility";

type AVSCEncoder = avro.Type;

interface CacheEntry {
/** Schema ID */
id: string;

/** avsc-specific representation for schema */
type: avro.Type;
encoder: AVSCEncoder;
}

const avroMimeType = "avro/binary";
const cacheOptions: LRUCacheOptions<string, any> = {
max: 128,
};

/**
* Avro encoder that obtains schemas from a schema registry and does not
Expand All @@ -43,11 +50,9 @@ export class SchemaRegistryAvroEncoder<MessageT = MessageWithMetadata> {
private readonly registry: SchemaRegistry;
private readonly autoRegisterSchemas: boolean;
private readonly messageAdapter?: MessageAdapter<MessageT>;
private readonly cacheBySchemaDefinition = new LRUCache<string, CacheEntry>(cacheOptions);
private readonly cacheById = new LRUCache<string, AVSCEncoder>(cacheOptions);

// REVIEW: signature.
//
// - Should we wrap all errors thrown by avsc to avoid having our exception //
// contract being tied to its implementation details?
/**
* encodes the value parameter according to the input schema and creates a message
* with the encoded data.
Expand All @@ -59,7 +64,7 @@ export class SchemaRegistryAvroEncoder<MessageT = MessageWithMetadata> {
*/
async encodeMessageData(value: unknown, schema: string): Promise<MessageT> {
const entry = await this.getSchemaByDefinition(schema);
const buffer = entry.type.toBuffer(value);
const buffer = entry.encoder.toBuffer(value);
const payload = new Uint8Array(
buffer.buffer,
buffer.byteOffset,
Expand Down Expand Up @@ -98,20 +103,17 @@ export class SchemaRegistryAvroEncoder<MessageT = MessageWithMetadata> {
const { body, contentType } = convertMessage(message, this.messageAdapter);
const buffer = Buffer.from(body);
const writerSchemaId = getSchemaId(contentType);
const writerSchema = await this.getSchema(writerSchemaId);
const writerSchemaEncoder = await this.getSchemaById(writerSchemaId);
if (readerSchema) {
const avscReaderSchema = this.getAvroTypeForSchema(readerSchema);
const resolver = avscReaderSchema.createResolver(writerSchema.type);
return avscReaderSchema.fromBuffer(buffer, resolver, true);
const readerSchemaEncoder = getEncoderForSchema(readerSchema);
const resolver = readerSchemaEncoder.createResolver(writerSchemaEncoder);
return readerSchemaEncoder.fromBuffer(buffer, resolver, true);
} else {
return writerSchema.type.fromBuffer(buffer);
return writerSchemaEncoder.fromBuffer(buffer);
}
}

private readonly cacheBySchemaDefinition = new Map<string, CacheEntry>();
private readonly cacheById = new Map<string, CacheEntry>();

private async getSchema(schemaId: string): Promise<CacheEntry> {
private async getSchemaById(schemaId: string): Promise<AVSCEncoder> {
const cached = this.cacheById.get(schemaId);
if (cached) {
return cached;
Expand All @@ -128,8 +130,8 @@ export class SchemaRegistryAvroEncoder<MessageT = MessageWithMetadata> {
);
}

const avroType = this.getAvroTypeForSchema(schemaResponse.definition);
return this.cache(schemaId, schemaResponse.definition, avroType);
const avroType = getEncoderForSchema(schemaResponse.definition);
return this.cache(schemaId, schemaResponse.definition, avroType).encoder;
}

private async getSchemaByDefinition(schema: string): Promise<CacheEntry> {
Expand All @@ -138,7 +140,7 @@ export class SchemaRegistryAvroEncoder<MessageT = MessageWithMetadata> {
return cached;
}

const avroType = this.getAvroTypeForSchema(schema);
const avroType = getEncoderForSchema(schema);
if (!avroType.name) {
throw new Error("Schema must have a name.");
}
Expand Down Expand Up @@ -176,16 +178,12 @@ export class SchemaRegistryAvroEncoder<MessageT = MessageWithMetadata> {
return this.cache(id, schema, avroType);
}

private cache(id: string, schema: string, type: avro.Type): CacheEntry {
const entry = { id, type };
private cache(id: string, schema: string, encoder: AVSCEncoder): CacheEntry {
const entry = { id, encoder };
this.cacheBySchemaDefinition.set(schema, entry);
this.cacheById.set(id, entry);
this.cacheById.set(id, encoder);
return entry;
}

private getAvroTypeForSchema(schema: string): avro.Type {
return avro.Type.forSchema(JSON.parse(schema), { omitRecordMethods: true });
}
}

function getSchemaId(contentType: string): string {
Expand Down Expand Up @@ -261,3 +259,7 @@ function tryReadingPreambleFormat(buffer: Buffer): MessageWithMetadata {
contentType: `${avroMimeType}+${schemaId}`,
};
}

function getEncoderForSchema(schema: string): AVSCEncoder {
return avro.Type.forSchema(JSON.parse(schema), { omitRecordMethods: true });
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ describe("SchemaRegistryAvroEncoder", function () {
const buffer = Buffer.from(message.body);
assert.strictEqual(`avro/binary+${schemaId}`, message.contentType);
assert.deepStrictEqual(testAvroType.fromBuffer(buffer), testValue);
assert.equal(encoder["cacheById"].length, 1);
assert.equal(
encoder["cacheById"].peek(schemaId)?.name,
"com.azure.schemaregistry.samples.AvroUser"
);
assert.equal(encoder["cacheBySchemaDefinition"].length, 1);
assert.equal(encoder["cacheBySchemaDefinition"].peek(testSchema)?.id, schemaId);
});

it("decodes from the expected format", async () => {
Expand Down Expand Up @@ -207,4 +214,50 @@ describe("SchemaRegistryAvroEncoder", function () {
testValue
);
});

it("cache size growth is bounded", async () => {
function makeRndStr(length: number): string {
let result = "";
const characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
for (let i = 0; i < length; i++) {
result += characters.charAt(Math.floor(Math.random() * characters.length));
}
return result;
}

const registry = createTestRegistry();
const encoder = await createTestEncoder(true, registry);
const entriesMaxCount = encoder["cacheById"].max;
const itersCount = 2 * entriesMaxCount;
assert.isAtLeast(itersCount, entriesMaxCount + 1);
let i = 0;
for (; i < itersCount; ++i) {
const field1 = makeRndStr(10);
const field2 = makeRndStr(10);
const valueToBeEncoded = JSON.parse(`{ "${field1}": "Nick", "${field2}": 42 }`);
const schemaToEncodeWith = JSON.stringify({
type: "record",
name: makeRndStr(8),
namespace: "com.azure.schemaregistry.samples",
fields: [
{
name: field1,
type: "string",
},
{
name: field2,
type: "int",
},
],
});
await encoder.encodeMessageData(valueToBeEncoded, schemaToEncodeWith);
if (i < entriesMaxCount) {
assert.equal(encoder["cacheById"].length, i + 1);
assert.equal(encoder["cacheBySchemaDefinition"].length, i + 1);
} else {
assert.equal(encoder["cacheById"].length, entriesMaxCount);
assert.equal(encoder["cacheBySchemaDefinition"].length, entriesMaxCount);
}
}
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
import { env, isLiveMode } from "@azure-tools/test-recorder";
import { ClientSecretCredential } from "@azure/identity";
import { testSchemaIds } from "./dummies";
import { v4 as uuid } from "uuid";

export function createTestRegistry(neverLive = false): SchemaRegistry {
if (!neverLive && isLiveMode()) {
Expand Down Expand Up @@ -50,8 +51,8 @@ export function createTestRegistry(neverLive = false): SchemaRegistry {
return result!.properties;

function newId(): string {
if (idCounter === testSchemaIds.length) {
throw new Error("Out of IDs. Generate more GUIDs and paste them above.");
if (idCounter >= testSchemaIds.length) {
return uuid();
}
const id = testSchemaIds[idCounter];
idCounter++;
Expand Down

0 comments on commit b88c0ba

Please sign in to comment.