From b88c0bae297bc7588fdcf0e3d574b009318ece5e Mon Sep 17 00:00:00 2001 From: Deyaaeldeen Almahallawi Date: Fri, 28 Jan 2022 16:36:23 -0500 Subject: [PATCH] [Schema Registry Avro] Use LRU cache policy (#20108) * [Schema Registry Avro] Update cache policy * renames * use lru-cache * add tests * update changelog * update test * update test * update semver for @types/lru-cache --- common/config/rush/pnpm-lock.yaml | 18 +++++-- .../schema-registry-avro/CHANGELOG.md | 1 + .../schema-registry-avro/package.json | 6 ++- .../src/schemaRegistryAvroEncoder.ts | 52 +++++++++--------- .../test/schemaRegistryAvroEncoder.spec.ts | 53 +++++++++++++++++++ .../test/utils/mockedRegistryClient.ts | 5 +- 6 files changed, 102 insertions(+), 33 deletions(-) diff --git a/common/config/rush/pnpm-lock.yaml b/common/config/rush/pnpm-lock.yaml index fe7a5924a2e7..178396437691 100644 --- a/common/config/rush/pnpm-lock.yaml +++ b/common/config/rush/pnpm-lock.yaml @@ -70,6 +70,7 @@ specifiers: '@rush-temp/arm-iothub': file:./projects/arm-iothub.tgz '@rush-temp/arm-keyvault': file:./projects/arm-keyvault.tgz '@rush-temp/arm-kubernetesconfiguration': file:./projects/arm-kubernetesconfiguration.tgz + '@rush-temp/arm-kusto': file:./projects/arm-kusto.tgz '@rush-temp/arm-labservices': file:./projects/arm-labservices.tgz '@rush-temp/arm-links': file:./projects/arm-links.tgz '@rush-temp/arm-loadtestservice': file:./projects/arm-loadtestservice.tgz @@ -2091,6 +2092,10 @@ packages: resolution: {integrity: sha512-5tXH6Bx/kNGd3MgffdmP4dy2Z+G4eaXw0SE81Tq3BNadtnMR5/ySMzX4SLEzHJzSmPNn4HIdpQsBvXMUykr58w==} dev: false + /@types/lru-cache/5.1.1: + resolution: {integrity: sha512-ssE3Vlrys7sdIzs5LOxCzTVMsU7i9oa/IaW92wF32JFb3CVczqOkru2xspuKczHEbG3nvmPY7IFqVmGGHdNbYw==} + dev: false + /@types/md5/2.3.1: resolution: {integrity: sha512-OK3oe+ALIoPSo262lnhAYwpqFNXbiwH2a+0+Z5YBnkQEwWD8fk5+PIeRhYA48PzvX9I4SGNpWy+9bLj8qz92RQ==} dependencies: @@ -8365,7 +8370,7 @@ packages: dev: false file:projects/agrifood-farming.tgz: - resolution: {integrity: sha512-tIbV71Sekmb01b/mAU9iH2ZNEe2nbzNTiGPFmlNRnGLOAEXkxRYdLDbSR26JEGicN3V76kUlOxbLl3wkCLHY+Q==, tarball: file:projects/agrifood-farming.tgz} + resolution: {integrity: sha512-JpQ+CMsdS6Lq2UVvWVtusmxuJgFKl8wsN9B0hmDFA9ZEcFD9Cu4V3FlHw8/OvMbTecY+c7ImUUNvPIqhSHgE4Q==, tarball: file:projects/agrifood-farming.tgz} name: '@rush-temp/agrifood-farming' version: 0.0.0 dependencies: @@ -13125,7 +13130,7 @@ packages: dev: false file:projects/core-http.tgz: - resolution: {integrity: sha512-dr/zKAzLJeqdjfjldg208kPRFEMAH6gtQCxlOVWj3j6aBWvXZLWYoH6DirE5GNgn3uQc6sWDfI58qL4w6hqmNQ==, tarball: file:projects/core-http.tgz} + resolution: {integrity: sha512-hNWQdd9zZmCEndn+4Bl3G0/90xrJnP8QjWHnB7gbiAJTfQHSW1rePvYICZi2X3IW+mtuqsJod+5kD2DF3F92vg==, tarball: file:projects/core-http.tgz} name: '@rush-temp/core-http' version: 0.0.0 dependencies: @@ -15388,7 +15393,7 @@ packages: dev: false file:projects/schema-registry-avro.tgz: - resolution: {integrity: sha512-nYoV4QCq4gGBjhk2hz9opui4CbCRNv3bcVGgDVCIqiso9PbCyWb6CqNqsSZyT7F2NIpVfGSNtElbrVDgx/2i/g==, tarball: file:projects/schema-registry-avro.tgz} + resolution: {integrity: sha512-mrrtNCFDOLZ3DCxGpU9HuD0w0oveqTH8tTHPa4pnfNz+bhS8fgVA6bvQoT30/MrapBBJWSCNVybNWEVW3/BRZQ==, tarball: file:projects/schema-registry-avro.tgz} name: '@rush-temp/schema-registry-avro' version: 0.0.0 dependencies: @@ -15399,8 +15404,10 @@ packages: '@rollup/plugin-replace': 2.4.2_rollup@1.32.1 '@types/chai': 4.3.0 '@types/chai-as-promised': 7.1.4 + '@types/lru-cache': 5.1.1 '@types/mocha': 7.0.2 '@types/node': 12.20.42 + '@types/uuid': 8.3.4 avsc: 5.7.3 buffer: 6.0.3 chai: 4.3.4 @@ -15422,6 +15429,7 @@ packages: karma-mocha-reporter: 2.2.5_karma@6.3.11 karma-source-map-support: 1.4.0 karma-sourcemap-loader: 0.3.8 + lru-cache: 6.0.0 mocha: 7.2.0 mocha-junit-reporter: 2.0.2_mocha@7.2.0 nyc: 15.1.0 @@ -15432,6 +15440,7 @@ packages: source-map-support: 0.5.21 tslib: 2.3.1 typescript: 4.2.4 + uuid: 8.3.2 transitivePeerDependencies: - bufferutil - debug @@ -15441,7 +15450,7 @@ packages: dev: false file:projects/schema-registry.tgz: - resolution: {integrity: sha512-fucVewqXE8TZV5enQSHpfLRsx8+I+1w1Cec81+x0xHbAcEs0ykoXa7NCNYQMAPi+V0Pu99w+Hqmjsrbn1SK6yg==, tarball: file:projects/schema-registry.tgz} + resolution: {integrity: sha512-bC/6TQwVav+spG+JqNxPVKgzAgTI2cYjiU/y2+r3RXlD8Wv/w2p99jyDQGcB+nALLOPMDPUAszhc8iUgFUEwSw==, tarball: file:projects/schema-registry.tgz} name: '@rush-temp/schema-registry' version: 0.0.0 dependencies: @@ -15482,7 +15491,6 @@ packages: transitivePeerDependencies: - bufferutil - debug - - encoding - supports-color - utf-8-validate dev: false diff --git a/sdk/schemaregistry/schema-registry-avro/CHANGELOG.md b/sdk/schemaregistry/schema-registry-avro/CHANGELOG.md index aa4a585844a3..78b30a4d677a 100644 --- a/sdk/schemaregistry/schema-registry-avro/CHANGELOG.md +++ b/sdk/schemaregistry/schema-registry-avro/CHANGELOG.md @@ -15,6 +15,7 @@ ### Bugs Fixed ### Other Changes +- The internal cache has been updated to be an LRU one with a max entries count of 128 ## 1.0.0-beta.5 (2021-11-17) diff --git a/sdk/schemaregistry/schema-registry-avro/package.json b/sdk/schemaregistry/schema-registry-avro/package.json index e481ba9a7bbc..afb7ae3126ed 100644 --- a/sdk/schemaregistry/schema-registry-avro/package.json +++ b/sdk/schemaregistry/schema-registry-avro/package.json @@ -74,6 +74,7 @@ "@azure/schema-registry": "1.0.2", "avsc": "^5.5.1", "buffer": "^6.0.0", + "lru-cache": "^6.0.0", "tslib": "^2.2.0" }, "devDependencies": { @@ -89,8 +90,10 @@ "@rollup/plugin-replace": "^2.2.0", "@types/chai": "^4.1.6", "@types/chai-as-promised": "^7.1.0", + "@types/lru-cache": "^5.1.1", "@types/mocha": "^7.0.2", "@types/node": "^12.0.0", + "@types/uuid": "^8.3.0", "chai": "^4.2.0", "chai-as-promised": "^7.1.1", "cross-env": "^7.0.2", @@ -118,6 +121,7 @@ "rollup": "^1.16.3", "rollup-plugin-shim": "^1.0.0", "source-map-support": "^0.5.9", - "typescript": "~4.2.0" + "typescript": "~4.2.0", + "uuid": "^8.3.0" } } diff --git a/sdk/schemaregistry/schema-registry-avro/src/schemaRegistryAvroEncoder.ts b/sdk/schemaregistry/schema-registry-avro/src/schemaRegistryAvroEncoder.ts index 7eb2126288b0..237a9d565ebf 100644 --- a/sdk/schemaregistry/schema-registry-avro/src/schemaRegistryAvroEncoder.ts +++ b/sdk/schemaregistry/schema-registry-avro/src/schemaRegistryAvroEncoder.ts @@ -2,6 +2,8 @@ // Licensed under the MIT license. import * as avro from "avsc"; +import LRUCache from "lru-cache"; +import LRUCacheOptions = LRUCache.Options; import { DecodeMessageDataOptions, MessageAdapter, @@ -11,15 +13,20 @@ import { import { SchemaDescription, SchemaRegistry } from "@azure/schema-registry"; import { isMessageWithMetadata } from "./utility"; +type AVSCEncoder = avro.Type; + interface CacheEntry { /** Schema ID */ id: string; /** avsc-specific representation for schema */ - type: avro.Type; + encoder: AVSCEncoder; } const avroMimeType = "avro/binary"; +const cacheOptions: LRUCacheOptions = { + max: 128, +}; /** * Avro encoder that obtains schemas from a schema registry and does not @@ -43,11 +50,9 @@ export class SchemaRegistryAvroEncoder { private readonly registry: SchemaRegistry; private readonly autoRegisterSchemas: boolean; private readonly messageAdapter?: MessageAdapter; + private readonly cacheBySchemaDefinition = new LRUCache(cacheOptions); + private readonly cacheById = new LRUCache(cacheOptions); - // REVIEW: signature. - // - // - Should we wrap all errors thrown by avsc to avoid having our exception // - // contract being tied to its implementation details? /** * encodes the value parameter according to the input schema and creates a message * with the encoded data. @@ -59,7 +64,7 @@ export class SchemaRegistryAvroEncoder { */ async encodeMessageData(value: unknown, schema: string): Promise { const entry = await this.getSchemaByDefinition(schema); - const buffer = entry.type.toBuffer(value); + const buffer = entry.encoder.toBuffer(value); const payload = new Uint8Array( buffer.buffer, buffer.byteOffset, @@ -98,20 +103,17 @@ export class SchemaRegistryAvroEncoder { const { body, contentType } = convertMessage(message, this.messageAdapter); const buffer = Buffer.from(body); const writerSchemaId = getSchemaId(contentType); - const writerSchema = await this.getSchema(writerSchemaId); + const writerSchemaEncoder = await this.getSchemaById(writerSchemaId); if (readerSchema) { - const avscReaderSchema = this.getAvroTypeForSchema(readerSchema); - const resolver = avscReaderSchema.createResolver(writerSchema.type); - return avscReaderSchema.fromBuffer(buffer, resolver, true); + const readerSchemaEncoder = getEncoderForSchema(readerSchema); + const resolver = readerSchemaEncoder.createResolver(writerSchemaEncoder); + return readerSchemaEncoder.fromBuffer(buffer, resolver, true); } else { - return writerSchema.type.fromBuffer(buffer); + return writerSchemaEncoder.fromBuffer(buffer); } } - private readonly cacheBySchemaDefinition = new Map(); - private readonly cacheById = new Map(); - - private async getSchema(schemaId: string): Promise { + private async getSchemaById(schemaId: string): Promise { const cached = this.cacheById.get(schemaId); if (cached) { return cached; @@ -128,8 +130,8 @@ export class SchemaRegistryAvroEncoder { ); } - const avroType = this.getAvroTypeForSchema(schemaResponse.definition); - return this.cache(schemaId, schemaResponse.definition, avroType); + const avroType = getEncoderForSchema(schemaResponse.definition); + return this.cache(schemaId, schemaResponse.definition, avroType).encoder; } private async getSchemaByDefinition(schema: string): Promise { @@ -138,7 +140,7 @@ export class SchemaRegistryAvroEncoder { return cached; } - const avroType = this.getAvroTypeForSchema(schema); + const avroType = getEncoderForSchema(schema); if (!avroType.name) { throw new Error("Schema must have a name."); } @@ -176,16 +178,12 @@ export class SchemaRegistryAvroEncoder { return this.cache(id, schema, avroType); } - private cache(id: string, schema: string, type: avro.Type): CacheEntry { - const entry = { id, type }; + private cache(id: string, schema: string, encoder: AVSCEncoder): CacheEntry { + const entry = { id, encoder }; this.cacheBySchemaDefinition.set(schema, entry); - this.cacheById.set(id, entry); + this.cacheById.set(id, encoder); return entry; } - - private getAvroTypeForSchema(schema: string): avro.Type { - return avro.Type.forSchema(JSON.parse(schema), { omitRecordMethods: true }); - } } function getSchemaId(contentType: string): string { @@ -261,3 +259,7 @@ function tryReadingPreambleFormat(buffer: Buffer): MessageWithMetadata { contentType: `${avroMimeType}+${schemaId}`, }; } + +function getEncoderForSchema(schema: string): AVSCEncoder { + return avro.Type.forSchema(JSON.parse(schema), { omitRecordMethods: true }); +} diff --git a/sdk/schemaregistry/schema-registry-avro/test/schemaRegistryAvroEncoder.spec.ts b/sdk/schemaregistry/schema-registry-avro/test/schemaRegistryAvroEncoder.spec.ts index 0969d094ad5e..9224437cf87a 100644 --- a/sdk/schemaregistry/schema-registry-avro/test/schemaRegistryAvroEncoder.spec.ts +++ b/sdk/schemaregistry/schema-registry-avro/test/schemaRegistryAvroEncoder.spec.ts @@ -78,6 +78,13 @@ describe("SchemaRegistryAvroEncoder", function () { const buffer = Buffer.from(message.body); assert.strictEqual(`avro/binary+${schemaId}`, message.contentType); assert.deepStrictEqual(testAvroType.fromBuffer(buffer), testValue); + assert.equal(encoder["cacheById"].length, 1); + assert.equal( + encoder["cacheById"].peek(schemaId)?.name, + "com.azure.schemaregistry.samples.AvroUser" + ); + assert.equal(encoder["cacheBySchemaDefinition"].length, 1); + assert.equal(encoder["cacheBySchemaDefinition"].peek(testSchema)?.id, schemaId); }); it("decodes from the expected format", async () => { @@ -207,4 +214,50 @@ describe("SchemaRegistryAvroEncoder", function () { testValue ); }); + + it("cache size growth is bounded", async () => { + function makeRndStr(length: number): string { + let result = ""; + const characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; + for (let i = 0; i < length; i++) { + result += characters.charAt(Math.floor(Math.random() * characters.length)); + } + return result; + } + + const registry = createTestRegistry(); + const encoder = await createTestEncoder(true, registry); + const entriesMaxCount = encoder["cacheById"].max; + const itersCount = 2 * entriesMaxCount; + assert.isAtLeast(itersCount, entriesMaxCount + 1); + let i = 0; + for (; i < itersCount; ++i) { + const field1 = makeRndStr(10); + const field2 = makeRndStr(10); + const valueToBeEncoded = JSON.parse(`{ "${field1}": "Nick", "${field2}": 42 }`); + const schemaToEncodeWith = JSON.stringify({ + type: "record", + name: makeRndStr(8), + namespace: "com.azure.schemaregistry.samples", + fields: [ + { + name: field1, + type: "string", + }, + { + name: field2, + type: "int", + }, + ], + }); + await encoder.encodeMessageData(valueToBeEncoded, schemaToEncodeWith); + if (i < entriesMaxCount) { + assert.equal(encoder["cacheById"].length, i + 1); + assert.equal(encoder["cacheBySchemaDefinition"].length, i + 1); + } else { + assert.equal(encoder["cacheById"].length, entriesMaxCount); + assert.equal(encoder["cacheBySchemaDefinition"].length, entriesMaxCount); + } + } + }); }); diff --git a/sdk/schemaregistry/schema-registry-avro/test/utils/mockedRegistryClient.ts b/sdk/schemaregistry/schema-registry-avro/test/utils/mockedRegistryClient.ts index d57c0cb78119..230996e1d2e5 100644 --- a/sdk/schemaregistry/schema-registry-avro/test/utils/mockedRegistryClient.ts +++ b/sdk/schemaregistry/schema-registry-avro/test/utils/mockedRegistryClient.ts @@ -14,6 +14,7 @@ import { import { env, isLiveMode } from "@azure-tools/test-recorder"; import { ClientSecretCredential } from "@azure/identity"; import { testSchemaIds } from "./dummies"; +import { v4 as uuid } from "uuid"; export function createTestRegistry(neverLive = false): SchemaRegistry { if (!neverLive && isLiveMode()) { @@ -50,8 +51,8 @@ export function createTestRegistry(neverLive = false): SchemaRegistry { return result!.properties; function newId(): string { - if (idCounter === testSchemaIds.length) { - throw new Error("Out of IDs. Generate more GUIDs and paste them above."); + if (idCounter >= testSchemaIds.length) { + return uuid(); } const id = testSchemaIds[idCounter]; idCounter++;