-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Schema Registry Avro] Use LRU cache policy #20108
Changes from all commits
d325cbe
1348958
bd6c276
e7a689a
9078d0c
16b336e
94b896d
e8dba94
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,8 @@ | |
// Licensed under the MIT license. | ||
|
||
import * as avro from "avsc"; | ||
import LRUCache from "lru-cache"; | ||
import LRUCacheOptions = LRUCache.Options; | ||
import { | ||
DecodeMessageDataOptions, | ||
MessageAdapter, | ||
|
@@ -11,15 +13,20 @@ import { | |
import { SchemaDescription, SchemaRegistry } from "@azure/schema-registry"; | ||
import { isMessageWithMetadata } from "./utility"; | ||
|
||
type AVSCEncoder = avro.Type; | ||
|
||
interface CacheEntry { | ||
/** Schema ID */ | ||
id: string; | ||
|
||
/** avsc-specific representation for schema */ | ||
type: avro.Type; | ||
encoder: AVSCEncoder; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: this is really the only part of the PR that tripped me up. It's clearly just a lexical change, but I can't help but wonder why we went from "avro.Type" to "AVSCEncoder" here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is basically the encoder for a specific schema that we use to encode and decode values and I renamed it to make it more readable, I remember when I first read this code, the |
||
} | ||
|
||
const avroMimeType = "avro/binary"; | ||
const cacheOptions: LRUCacheOptions<string, any> = { | ||
max: 128, | ||
}; | ||
|
||
/** | ||
* Avro encoder that obtains schemas from a schema registry and does not | ||
|
@@ -43,11 +50,9 @@ export class SchemaRegistryAvroEncoder<MessageT = MessageWithMetadata> { | |
private readonly registry: SchemaRegistry; | ||
private readonly autoRegisterSchemas: boolean; | ||
private readonly messageAdapter?: MessageAdapter<MessageT>; | ||
private readonly cacheBySchemaDefinition = new LRUCache<string, CacheEntry>(cacheOptions); | ||
private readonly cacheById = new LRUCache<string, AVSCEncoder>(cacheOptions); | ||
|
||
// REVIEW: signature. | ||
// | ||
// - Should we wrap all errors thrown by avsc to avoid having our exception // | ||
// contract being tied to its implementation details? | ||
Comment on lines
-47
to
-50
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is tracked by #20072 already. |
||
/** | ||
* encodes the value parameter according to the input schema and creates a message | ||
* with the encoded data. | ||
|
@@ -59,7 +64,7 @@ export class SchemaRegistryAvroEncoder<MessageT = MessageWithMetadata> { | |
*/ | ||
async encodeMessageData(value: unknown, schema: string): Promise<MessageT> { | ||
const entry = await this.getSchemaByDefinition(schema); | ||
const buffer = entry.type.toBuffer(value); | ||
const buffer = entry.encoder.toBuffer(value); | ||
const payload = new Uint8Array( | ||
buffer.buffer, | ||
buffer.byteOffset, | ||
|
@@ -98,20 +103,17 @@ export class SchemaRegistryAvroEncoder<MessageT = MessageWithMetadata> { | |
const { body, contentType } = convertMessage(message, this.messageAdapter); | ||
const buffer = Buffer.from(body); | ||
const writerSchemaId = getSchemaId(contentType); | ||
const writerSchema = await this.getSchema(writerSchemaId); | ||
const writerSchemaEncoder = await this.getSchemaById(writerSchemaId); | ||
if (readerSchema) { | ||
const avscReaderSchema = this.getAvroTypeForSchema(readerSchema); | ||
const resolver = avscReaderSchema.createResolver(writerSchema.type); | ||
return avscReaderSchema.fromBuffer(buffer, resolver, true); | ||
const readerSchemaEncoder = getEncoderForSchema(readerSchema); | ||
const resolver = readerSchemaEncoder.createResolver(writerSchemaEncoder); | ||
return readerSchemaEncoder.fromBuffer(buffer, resolver, true); | ||
} else { | ||
return writerSchema.type.fromBuffer(buffer); | ||
return writerSchemaEncoder.fromBuffer(buffer); | ||
} | ||
} | ||
|
||
private readonly cacheBySchemaDefinition = new Map<string, CacheEntry>(); | ||
private readonly cacheById = new Map<string, CacheEntry>(); | ||
|
||
private async getSchema(schemaId: string): Promise<CacheEntry> { | ||
private async getSchemaById(schemaId: string): Promise<AVSCEncoder> { | ||
const cached = this.cacheById.get(schemaId); | ||
if (cached) { | ||
return cached; | ||
|
@@ -128,8 +130,8 @@ export class SchemaRegistryAvroEncoder<MessageT = MessageWithMetadata> { | |
); | ||
} | ||
|
||
const avroType = this.getAvroTypeForSchema(schemaResponse.definition); | ||
return this.cache(schemaId, schemaResponse.definition, avroType); | ||
const avroType = getEncoderForSchema(schemaResponse.definition); | ||
return this.cache(schemaId, schemaResponse.definition, avroType).encoder; | ||
} | ||
|
||
private async getSchemaByDefinition(schema: string): Promise<CacheEntry> { | ||
|
@@ -138,7 +140,7 @@ export class SchemaRegistryAvroEncoder<MessageT = MessageWithMetadata> { | |
return cached; | ||
} | ||
|
||
const avroType = this.getAvroTypeForSchema(schema); | ||
const avroType = getEncoderForSchema(schema); | ||
if (!avroType.name) { | ||
throw new Error("Schema must have a name."); | ||
} | ||
|
@@ -176,16 +178,12 @@ export class SchemaRegistryAvroEncoder<MessageT = MessageWithMetadata> { | |
return this.cache(id, schema, avroType); | ||
} | ||
|
||
private cache(id: string, schema: string, type: avro.Type): CacheEntry { | ||
const entry = { id, type }; | ||
private cache(id: string, schema: string, encoder: AVSCEncoder): CacheEntry { | ||
const entry = { id, encoder }; | ||
this.cacheBySchemaDefinition.set(schema, entry); | ||
this.cacheById.set(id, entry); | ||
this.cacheById.set(id, encoder); | ||
return entry; | ||
} | ||
|
||
private getAvroTypeForSchema(schema: string): avro.Type { | ||
return avro.Type.forSchema(JSON.parse(schema), { omitRecordMethods: true }); | ||
} | ||
} | ||
|
||
function getSchemaId(contentType: string): string { | ||
|
@@ -261,3 +259,7 @@ function tryReadingPreambleFormat(buffer: Buffer): MessageWithMetadata { | |
contentType: `${avroMimeType}+${schemaId}`, | ||
}; | ||
} | ||
|
||
function getEncoderForSchema(schema: string): AVSCEncoder { | ||
return avro.Type.forSchema(JSON.parse(schema), { omitRecordMethods: true }); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,7 @@ import { | |
import { env, isLiveMode } from "@azure-tools/test-recorder"; | ||
import { ClientSecretCredential } from "@azure/identity"; | ||
import { testSchemaIds } from "./dummies"; | ||
import { v4 as uuid } from "uuid"; | ||
|
||
export function createTestRegistry(neverLive = false): SchemaRegistry { | ||
if (!neverLive && isLiveMode()) { | ||
|
@@ -50,8 +51,8 @@ export function createTestRegistry(neverLive = false): SchemaRegistry { | |
return result!.properties; | ||
|
||
function newId(): string { | ||
if (idCounter === testSchemaIds.length) { | ||
throw new Error("Out of IDs. Generate more GUIDs and paste them above."); | ||
if (idCounter >= testSchemaIds.length) { | ||
return uuid(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess I'm wondering a bit why we have any pre-generated GUIDs in the tests unless we rely on specific GUIDs. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed, pre-generated GUIDs was me and I forget my reason. LOL. I would agree that if it's ok to generate some it should be ok to generate all. I wouldn't keep a mix. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is my understanding that the hard coded ones are meant to serve as already registered schemas so some tests can call decode without having to call encode first/register schema explicitly first. I feel like it is reasonable to keep the hard-coded one based on this though it could make reading the tests a bit harder. I can look into refactoring this in another PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, yes, that seems to ring a bell. |
||
} | ||
const id = testSchemaIds[idCounter]; | ||
idCounter++; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One small thing to look at is I know we have a list of "blessed" dependencies and a guideline to avoid other external dependencies. But I don't know how up-to-date this guideline is so up to you if you want to follow up on it https://azure.github.io/azure-sdk/typescript_implementation.html#ts-dependencies-no-other-packages
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one's a pretty core JS ecosystem tool. It has 66M weekly downloads (4x more than React).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm good with this one, it's already in our graph in several places