From 641187efbfdced5baa83a7f38e1955f68e0d6a7e Mon Sep 17 00:00:00 2001 From: "Marc J. Schmidt" Date: Fri, 17 Nov 2023 21:31:48 +0100 Subject: [PATCH] refactor(broker): new easier cache API add also BrokerCacheItem.exists(). Rework cache API to use multi-level caches: L1 is in-memory, and L2 is on the broker server (deepkit broker, Redis, ...). L1 is synced via invalidation bus messages. refactor(logger): rename warning to warn to be consistent with console.* API --- package-lock.json | 11 + packages/broker/package.json | 3 + .../broker/src/adapters/deepkit-adapter.ts | 74 +++- packages/broker/src/broker.ts | 373 +++++++++++++----- packages/broker/src/kernel.ts | 110 +++++- packages/broker/src/model.ts | 40 +- packages/broker/tests/broker.spec.ts | 70 ++-- packages/framework/src/application-server.ts | 14 +- packages/http/src/http.ts | 2 +- packages/logger/src/logger.ts | 17 +- tslint.json | 2 +- 11 files changed, 564 insertions(+), 152 deletions(-) diff --git a/package-lock.json b/package-lock.json index f22552f9e..57c1813ab 100644 --- a/package-lock.json +++ b/package-lock.json @@ -6429,6 +6429,14 @@ "node": ">=10" } }, + "node_modules/@lukeed/ms": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/@lukeed/ms/-/ms-2.0.1.tgz", + "integrity": "sha512-Xs/4RZltsAL7pkvaNStUQt7netTkyxrS0K+RILcVr3TRMS/ToOg4I6uNfhB9SlGsnWBym4U+EaXq0f0cEMNkHA==", + "engines": { + "node": ">=8" + } + }, "node_modules/@marcj/ts-clone-node": { "version": "2.2.0", "resolved": "https://registry.npmjs.org/@marcj/ts-clone-node/-/ts-clone-node-2.2.0.tgz", @@ -29983,6 +29991,9 @@ "name": "@deepkit/broker", "version": "1.0.1-alpha.105", "license": "MIT", + "dependencies": { + "@lukeed/ms": "^2.0.1" + }, "devDependencies": { "@deepkit/bson": "^1.0.1-alpha.105", "@deepkit/core": "^1.0.1-alpha.105", diff --git a/packages/broker/package.json b/packages/broker/package.json index 5de81d241..489eea70e 100644 --- a/packages/broker/package.json +++ b/packages/broker/package.json @@ -21,6 +21,9 @@ "scripts": { "build": "echo '{\"type\": \"module\"}' > ./dist/esm/package.json" }, + "dependencies": { + "@lukeed/ms": "^2.0.1" + }, "peerDependencies": { "@deepkit/app": "^1.0.1-alpha.40", "@deepkit/bson": "^1.0.1-alpha.40", diff --git a/packages/broker/src/adapters/deepkit-adapter.ts b/packages/broker/src/adapters/deepkit-adapter.ts index d77280a82..9ec73c549 100644 --- a/packages/broker/src/adapters/deepkit-adapter.ts +++ b/packages/broker/src/adapters/deepkit-adapter.ts @@ -1,11 +1,14 @@ -import { BrokerAdapter, BrokerCacheOptions, BrokerLockOptions, BrokerQueueMessage, Release } from '../broker.js'; +import { BrokerAdapter, BrokerCacheItemOptionsResolved, BrokerQueueMessage, BrokerTimeOptionsResolved, Release } from '../broker.js'; import { getTypeJitContainer, ReflectionKind, Type, TypePropertySignature } from '@deepkit/type'; import { brokerBusPublish, brokerBusResponseHandleMessage, brokerBusSubscribe, brokerGet, + brokerGetCache, brokerIncrement, + brokerInvalidateCache, + brokerInvalidateCacheMessage, brokerLock, brokerLockId, BrokerQueueMessageHandled, @@ -13,9 +16,12 @@ import { BrokerQueueResponseHandleMessage, BrokerQueueSubscribe, BrokerQueueUnsubscribe, + brokerResponseGetCache, + brokerResponseGetCacheMeta, brokerResponseIncrement, brokerResponseIsLock, brokerSet, + brokerSetCache, BrokerType } from '../model.js'; import { ClientTransportAdapter, createRpcMessage, RpcBaseClient, RpcMessage, RpcMessageRouteType, RpcWebSocketClientAdapter } from '@deepkit/rpc'; @@ -67,6 +73,8 @@ export class BrokerDeepkitConnection extends RpcBaseClient { activeChannels = new Map void)[] }>(); consumers = new Map void)[] }>(); + subscribedToInvalidations?: ((message: brokerInvalidateCacheMessage) => void)[]; + protected onMessage(message: RpcMessage) { if (message.routeType === RpcMessageRouteType.server) { if (message.type === BrokerType.EntityFields) { @@ -78,6 +86,11 @@ export class BrokerDeepkitConnection extends RpcBaseClient { const channel = this.activeChannels.get(body.c); if (!channel) return; for (const callback of channel.callbacks) callback(body.v); + } else if (message.type === BrokerType.ResponseInvalidationCache) { + const body = message.parseBody(); + if (this.subscribedToInvalidations) { + for (const callback of this.subscribedToInvalidations) callback(body); + } } else if (message.type === BrokerType.QueueResponseHandleMessage) { const body = message.parseBody(); const consumer = this.consumers.get(body.c); @@ -140,6 +153,7 @@ export class BrokerDeepkitPool { */ export class BrokerDeepkitAdapter implements BrokerAdapter { protected pool = new BrokerDeepkitPool(this.options); + protected onInvalidateCacheCallbacks: ((message: brokerInvalidateCacheMessage) => void)[] = []; constructor(public options: BrokerDeepkitAdapterOptions) { } @@ -148,14 +162,54 @@ export class BrokerDeepkitAdapter implements BrokerAdapter { await this.pool.disconnect(); } - async setCache(key: string, value: any, options: BrokerCacheOptions, type: Type): Promise { + onInvalidateCache(callback: (message: brokerInvalidateCacheMessage) => void): void { + this.onInvalidateCacheCallbacks.push(callback); + } + + async invalidateCache(key: string): Promise { + await this.pool.getConnection('cache/' + key).sendMessage(BrokerType.InvalidateCache, { n: key }).ackThenClose(); + } + + async setCache(key: string, value: any, options: BrokerCacheItemOptionsResolved, type: Type): Promise { const serializer = getSerializer(type); const v = serializer.encode(value); - await this.pool.getConnection('cache/' + key).sendMessage(BrokerType.Set, { n: key, v }).ackThenClose(); + await this.pool.getConnection('cache/' + key).sendMessage(BrokerType.SetCache, { n: key, v, ttl: options.ttl }).ackThenClose(); + } + + async getCacheMeta(key: string): Promise<{ ttl: number } | undefined> { + const first = await this.pool.getConnection('cache/' + key) + .sendMessage(BrokerType.GetCacheMeta, { n: key }) + .firstThenClose(BrokerType.ResponseGetCacheMeta); + if ('missing' in first) return undefined; + return first; } - async getCache(key: string, type: Type): Promise { - const first: RpcMessage = await this.pool.getConnection('cache/' + key) + async getCache(key: string, type: Type): Promise<{ value: any, ttl: number } | undefined> { + const connection = this.pool.getConnection('cache/' + key); + + if (!connection.subscribedToInvalidations) { + connection.subscribedToInvalidations = this.onInvalidateCacheCallbacks; + await connection + .sendMessage(BrokerType.EnableInvalidationCacheMessages) + .ackThenClose(); + } + + const first = await connection + .sendMessage(BrokerType.GetCache, { n: key }) + .firstThenClose(BrokerType.ResponseGetCache); + + const serializer = getSerializer(type); + return first.v && first.ttl !== undefined ? { value: serializer.decode(first.v, 0), ttl: first.ttl } : undefined; + } + + async set(key: string, value: any, type: Type): Promise { + const serializer = getSerializer(type); + const v = serializer.encode(value); + await this.pool.getConnection('key/' + key).sendMessage(BrokerType.Set, { n: key, v }).ackThenClose(); + } + + async get(key: string, type: Type): Promise { + const first: RpcMessage = await this.pool.getConnection('key/' + key) .sendMessage(BrokerType.Get, { n: key }).firstThenClose(BrokerType.ResponseGet); if (first.buffer && first.buffer.byteLength > first.bodyOffset) { const serializer = getSerializer(type); @@ -177,7 +231,7 @@ export class BrokerDeepkitAdapter implements BrokerAdapter { return response.v; } - async lock(id: string, options: BrokerLockOptions): Promise { + async lock(id: string, options: BrokerTimeOptionsResolved): Promise { const subject = this.pool.getConnection('lock/' + id) .sendMessage(BrokerType.Lock, { id, ttl: options.ttl, timeout: options.timeout }); await subject.waitNext(BrokerType.ResponseLock); //or throw error @@ -188,7 +242,7 @@ export class BrokerDeepkitAdapter implements BrokerAdapter { }; } - async tryLock(id: string, options: BrokerLockOptions): Promise { + async tryLock(id: string, options: BrokerTimeOptionsResolved): Promise { const subject = this.pool.getConnection('lock/' + id) .sendMessage(BrokerType.TryLock, { id, ttl: options.ttl }); const message = await subject.waitNextMessage(); @@ -218,9 +272,13 @@ export class BrokerDeepkitAdapter implements BrokerAdapter { } async subscribe(key: string, callback: (message: any) => void, type: Type): Promise { + const connection = this.pool.getConnection('bus/' + key); + return await this._subscribe(connection, key, callback, type); + } + + protected async _subscribe(connection: BrokerDeepkitConnection, key: string, callback: (message: any) => void, type: Type): Promise { const serializer = getSerializer(type); - const connection = this.pool.getConnection('bus/' + key); const parsedCallback = (next: Uint8Array) => { try { const parsed = serializer.decode(next, 0); diff --git a/packages/broker/src/broker.ts b/packages/broker/src/broker.ts index 2a332739c..63bef21ed 100644 --- a/packages/broker/src/broker.ts +++ b/packages/broker/src/broker.ts @@ -1,43 +1,164 @@ -import { ReceiveType, reflect, ReflectionKind, resolveReceiveType, Type } from '@deepkit/type'; +import { ReceiveType, ReflectionKind, resolveReceiveType, Type } from '@deepkit/type'; import { EventToken } from '@deepkit/event'; +import { parse } from '@lukeed/ms'; +import { asyncOperation, formatError } from '@deepkit/core'; +import { ConsoleLogger, LoggerInterface } from '@deepkit/logger'; -export interface BrokerLockOptions { +export interface BrokerTimeOptions { /** - * Time to live in seconds. Default 2 minutes. - * - * The lock is automatically released after this time. - * This is to prevent deadlocks. + * Time to live in milliseconds. 0 means no ttl. + * Value is either milliseconds or a string like '2 minutes', '8s', '24hours'. + */ + ttl: string | number; + + /** + * Timeout in milliseconds. 0 means no timeout. + * Value is either milliseconds or a string like '2 minutes', '8s', '24hours'. + */ + timeout: number | string; +} + +export interface BrokerTimeOptionsResolved { + /** + * Time to live in milliseconds. 0 means no ttl. */ ttl: number; /** - * Timeout when acquiring the lock in seconds. Default 30 seconds. - * Ween a lock is not acquired after this time, an error is thrown. + * Timeout in milliseconds. 0 means no timeout. */ timeout: number; } +function parseBrokerTimeoutOptions(options: Partial): BrokerTimeOptionsResolved { + return { + ttl: parseTime(options.ttl) ?? 0, + timeout: parseTime(options.timeout) ?? 0, + }; +} + +function parseTime(value?: string | number): number | undefined { + if ('undefined' === typeof value) return; + if ('string' === typeof value) return value ? parse(value) || 0 : undefined; + return value; +} + +export interface BrokerCacheOptions { + /** + * Relative time to live in milliseconds. 0 means no ttl. + * + * Value is either milliseconds or a string like '2 minutes', '8s', '24hours'. + */ + ttl: number | string; + + /** + * How many ms the cache is allowed to be stale. Set to 0 to disable stale cache. + * Default is 1000ms. + * Improves performance by serving slightly stale cache while the cache is being rebuilt. + */ + maxStale: number | string; + + /** + * How many ms the cache is allowed to be stored in-memory. Set to 0 to disable in-memory cache. + */ + inMemoryTtl: number | string; +} + +export interface BrokerCacheOptionsResolved extends BrokerCacheOptions { + ttl: number; + maxStale: number; + inMemoryTtl: number; +} + +export interface BrokerCacheItemOptions { + /** + * Relative time to live in milliseconds. 0 means no ttl. + * + * Value is either milliseconds or a string like '2 minutes', '8s', '24hours'. + */ + ttl: number | string; + + tags: string[]; + + /** + * How many ms the cache is allowed to be stale. Set to 0 to disable stale cache. + * Default is 1000ms. + * Improves performance by serving slightly stale cache while the cache is being rebuilt. + */ + maxStale: number | string; +} + +export interface BrokerCacheItemOptionsResolved extends BrokerCacheItemOptions { + ttl: number; + maxStale: number; +} + +function parseBrokerKeyOptions(options: Partial): BrokerCacheItemOptionsResolved { + return { + ttl: parseTime(options.ttl) ?? 30_000, + maxStale: parseTime(options.maxStale) ?? 1_000, + tags: options.tags || [], + }; +} + +function parseBrokerCacheOptions(options: Partial): BrokerCacheOptionsResolved { + return { + ttl: parseTime(options.ttl) ?? 60_000, + maxStale: parseTime(options.maxStale) ?? 10_000, + inMemoryTtl: parseTime(options.inMemoryTtl) ?? 60_000, + }; +} + export type Release = () => Promise; -export interface BrokerAdapter { - lock(id: string, options: BrokerLockOptions): Promise; +export interface BrokerInvalidateCacheMessage { + key: string; + ttl: number; +} + +export interface BrokerAdapterCache { + getCache(key: string, type: Type): Promise<{ value: any, ttl: number } | undefined>; + + getCacheMeta(key: string): Promise<{ ttl: number } | undefined>; + + setCache(key: string, value: any, options: BrokerCacheItemOptionsResolved, type: Type): Promise; + + invalidateCache(key: string): Promise; + + onInvalidateCache(callback: (message: BrokerInvalidateCacheMessage) => void): void; +} + +export interface BrokerAdapter extends BrokerAdapterCache { + lock(id: string, options: BrokerTimeOptionsResolved): Promise; isLocked(id: string): Promise; - tryLock(id: string, options: BrokerLockOptions): Promise; + tryLock(id: string, options: BrokerTimeOptionsResolved): Promise; - getCache(key: string, type: Type): Promise; + get(key: string, type: Type): Promise; - setCache(key: string, value: any, options: BrokerCacheOptions, type: Type): Promise; + set(key: string, value: any, type: Type): Promise; increment(key: string, value: any): Promise; + /** + * Publish a message on the bus aka pub/sub. + */ publish(name: string, message: any, type: Type): Promise; + /** + * Subscribe to messages on the bus aka pub/sub. + */ subscribe(name: string, callback: (message: any) => void, type: Type): Promise; + /** + * Consume messages from a queue. + */ consume(name: string, callback: (message: any) => Promise, options: { maxParallel: number }, type: Type): Promise; + /** + * Produce a message to a queue. + */ produce(name: string, message: any, type: Type, options?: { delay?: number, priority?: number }): Promise; disconnect(): Promise; @@ -45,22 +166,13 @@ export interface BrokerAdapter { export const onBrokerLock = new EventToken('broker.lock'); -export interface BrokerCacheOptions { - ttl: number; - tags: string[]; -} - export class CacheError extends Error { } export type BrokerBusChannel = [Name, Type]; -export type BrokerCacheKey = [Key, Parameters, Type]; - export type BrokerQueueChannel = [Name, Type]; -export type CacheBuilder> = (parameters: T[1], options: BrokerCacheOptions) => T[2] | Promise; - export class BrokerQueueMessage { public state: 'pending' | 'done' | 'failed' = 'pending'; public error?: Error; @@ -126,45 +238,142 @@ export class BrokerBus { } } -export class BrokerCache> { +export type CacheBuilder = () => T | Promise; + +interface CacheStoreEntry { + value: any; + ttl: number; //absolute timestamp in ms + inMemoryTtl?: number; //absolute timestamp in ms + built: number; //how many times the cache was built + building?: Promise; +} + +export class BrokerCacheStore { + /** + * This is a short-lived cache pool for the current process. + * Values are fetched from the broker when not available and stored here for a short time (configurable). + */ + cache = new Map(); + + constructor(public config: BrokerCacheOptionsResolved) { + } + + invalidate(key: string) { + if (this.config.maxStale) { + //don't delete immediately as it might be allowed to read stale cache while it is being rebuilt. + const entry = this.cache.get(key); + if (!entry) return; + entry.ttl = Date.now(); + setTimeout(() => { + if (this.cache.get(key) === entry) { + this.cache.delete(key); + } + }, this.config.maxStale); + } else { + // no stale reading allowed, so we can delete it immediately + this.cache.delete(key); + } + } + + set(key: string, value: CacheStoreEntry) { + if (!this.config.inMemoryTtl) return; + + const ttl = value.inMemoryTtl = Date.now() + this.config.inMemoryTtl; + this.cache.set(key, value); + + setTimeout(() => { + if (ttl === this.cache.get(key)?.ttl) { + // still the same value, so we can delete it + this.cache.delete(key); + } + }, this.config.inMemoryTtl); + } +} + +export class BrokerCacheItem { constructor( private key: string, private builder: CacheBuilder, - private options: BrokerCacheOptions, - private adapter: BrokerAdapter, + private options: BrokerCacheItemOptionsResolved, + private adapter: BrokerAdapterCache, + private store: BrokerCacheStore, private type: Type, + private logger: LoggerInterface, ) { } - protected getCacheKey(parameters: T[1]): string { - //this.key contains parameters e.g. user/:id, id comes from parameters.id. let's replace all of it. - //note: we could create JIT function for this, but it's probably not worth it. - return this.key.replace(/:([a-zA-Z0-9_]+)/g, (v, name) => { - if (!(name in parameters)) throw new CacheError(`Parameter ${name} not given`); - return String(parameters[name]); + protected build(entry: CacheStoreEntry): Promise { + return entry.building = asyncOperation(async (resolve) => { + entry.value = await this.builder(); + entry.ttl = Date.now() + this.options.ttl; + entry.built++; + entry.building = undefined; + resolve(); }); } - async set(parameters: T[1], value: T[2], options: Partial = {}) { - const cacheKey = this.getCacheKey(parameters); - await this.adapter.setCache(cacheKey, value, { ...this.options, ...options }, this.type); + async set(value: T) { + await this.adapter.setCache(this.key, value, this.options, this.type); + } + + async invalidate() { + await this.adapter.invalidateCache(this.key); } - async increment(parameters: T[1], value: number) { - const cacheKey = this.getCacheKey(parameters); - await this.adapter.increment(cacheKey, value); + async exists(): Promise { + const entry = this.store.cache.get(this.key); + if (entry) { + return entry.ttl > Date.now(); + } + + const l2Entry = await this.adapter.getCacheMeta(this.key); + return !!l2Entry && l2Entry.ttl > Date.now(); } - async get(parameters: T[1]): Promise { - const cacheKey = this.getCacheKey(parameters); - let entry = await this.adapter.getCache(cacheKey, this.type); - if (entry !== undefined) return entry; + async get(): Promise { + //read L1 + let entry = this.store.cache.get(this.key); + if (!entry) { + //read from L2 + const l2Entry = await this.adapter.getCache(this.key, this.type); + if (l2Entry) { + entry = { value: l2Entry.value, built: 0, ttl: l2Entry.ttl }; + this.store.set(this.key, entry); + } + } + + if (entry) { + //check ttl + const delta = entry.ttl - Date.now(); + if (delta <= 0) { + //cache is expired, rebuild it. + + //if entry.building is set, then this process already started rebuilding the cache. + // we simply wait and return the result + if (entry.building) { + // if the delta is small enough, we simply serve the old value + if (entry.built > 0 && delta < this.options.maxStale) return entry.value; + + await entry.building; + return entry.value; + } + } else { + //cache is still valid + return entry.value; + } + } else { + entry = { value: undefined, built: 0, ttl: Date.now() + this.options.ttl }; + this.store.set(this.key, entry); + } - const options: BrokerCacheOptions = { ...this.options }; - entry = await this.builder(parameters, options); - await this.adapter.setCache(cacheKey, entry, options, this.type); + //cache is expired or nearly created, rebuild it. + await this.build(entry); + //no need to wait for L2 to be updated, we can return the value already + this.adapter.setCache(this.key, entry.value, this.options, this.type).catch((error: any) => { + this.logger.warn(`Could not send cache to L2 ${this.key}: ${formatError(error)}`); + }); - return entry; + return entry.value; } } @@ -178,7 +387,7 @@ export class BrokerLock { constructor( private id: string, private adapter: BrokerAdapter, - private options: BrokerLockOptions, + private options: BrokerTimeOptionsResolved, ) { } @@ -231,9 +440,31 @@ export class BrokerLock { } } +export class BrokerCache { + private store = new BrokerCacheStore(this.config); + + constructor(private adapter: BrokerAdapterCache, private config: BrokerCacheOptionsResolved, private logger: LoggerInterface) { + this.adapter.onInvalidateCache((message) => { + this.store.invalidate(message.key); + }); + } + + item(key: string, builder: CacheBuilder, options?: Partial, type?: ReceiveType): BrokerCacheItem { + return new BrokerCacheItem(key, builder, parseBrokerKeyOptions(Object.assign({}, this.config, options)), this.adapter, this.store, resolveReceiveType(type), this.logger); + } +} + +export interface BrokerConfig { + cache?: Partial; +} + export class Broker { + public readonly cache: BrokerCache = new BrokerCache(this.adapter, parseBrokerCacheOptions(this.config.cache || {}), this.logger); + constructor( - private readonly adapter: BrokerAdapter + private readonly adapter: BrokerAdapter, + private readonly config: Partial = {}, + private readonly logger: LoggerInterface = new ConsoleLogger(), ) { } @@ -242,55 +473,17 @@ export class Broker { * * The object returned can be used to acquire and release the lock. */ - public lock(id: string, options: Partial = {}): BrokerLock { - return new BrokerLock(id, this.adapter, Object.assign({ ttl: 60 * 2, timeout: 30 }, options)); + public lock(id: string, options: Partial = {}): BrokerLock { + const parsedOptions = parseBrokerTimeoutOptions(options); + parsedOptions.ttl ||= 60 * 2 * 1000; //2 minutes + parsedOptions.timeout ||= 30 * 1000; //30 seconds + return new BrokerLock(id, this.adapter, parsedOptions); } public disconnect(): Promise { return this.adapter.disconnect(); } - protected cacheProvider: { [path: string]: (...args: any[]) => any } = {}; - - public provideCache>(provider: (options: T[1]) => T[2] | Promise, type?: ReceiveType) { - type = resolveReceiveType(type); - if (type.kind !== ReflectionKind.tuple) throw new CacheError(`Invalid type given`); - if (type.types[0].type.kind !== ReflectionKind.literal) throw new CacheError(`Invalid type given`); - const path = String(type.types[0].type.literal); - this.cacheProvider[path] = provider; - } - - public cache>(type?: ReceiveType): BrokerCache { - type = resolveReceiveType(type); - if (type.kind !== ReflectionKind.tuple) throw new CacheError(`Invalid type given`); - if (type.types[0].type.kind !== ReflectionKind.literal) throw new CacheError(`Invalid type given`); - const path = String(type.types[0].type.literal); - const provider = this.cacheProvider[path]; - if (!provider) throw new CacheError(`No cache provider for cache ${type.typeName} (${path}) registered`); - - return new BrokerCache(path, provider, { ttl: 30, tags: [] }, this.adapter, type.types[2].type); - } - - public async get(key: string, builder: (options: BrokerCacheOptions) => Promise, type?: ReceiveType): Promise { - if (!type) { - //type not manually provided via Broker.get, so we try to extract it from the builder. - const fn = reflect(builder); - if (fn.kind !== ReflectionKind.function) throw new CacheError(`Can not detect type of builder function`); - type = fn.return; - while (type.kind === ReflectionKind.promise) type = type.type; - } else { - type = resolveReceiveType(type); - } - - const cache = await this.adapter.getCache(key, type); - if (cache !== undefined) return cache; - - const options: BrokerCacheOptions = { ttl: 30, tags: [] }; - const value = await builder(options); - await this.adapter.setCache(key, value, options, type); - return value; - } - public bus(path: string, type?: ReceiveType): BrokerBus { type = resolveReceiveType(type); return new BrokerBus(path, this.adapter, type); diff --git a/packages/broker/src/kernel.ts b/packages/broker/src/kernel.ts index a2298dd90..b34e3f5b1 100644 --- a/packages/broker/src/kernel.ts +++ b/packages/broker/src/kernel.ts @@ -17,16 +17,20 @@ import { brokerDelete, brokerEntityFields, brokerGet, + brokerGetCache, brokerIncrement, + brokerInvalidateCacheMessage, brokerLock, brokerLockId, BrokerQueueMessageHandled, BrokerQueuePublish, BrokerQueueResponseHandleMessage, BrokerQueueSubscribe, + brokerResponseGetCache, + brokerResponseGetCacheMeta, brokerResponseIncrement, brokerResponseIsLock, - brokerSet, + brokerSetCache, BrokerType, QueueMessage, QueueMessageState @@ -61,6 +65,8 @@ export class BrokerConnection extends RpcKernelBaseConnection { for (const c of this.subscribedChannels) { this.state.unsubscribe(c, this); } + arrayRemoveItem(this.state.invalidationCacheMessageConnections, this); + for (const lock of this.locks.values()) { lock.unlock(); } @@ -194,9 +200,15 @@ export class BrokerConnection extends RpcKernelBaseConnection { response.ack(); break; } + case BrokerType.SetCache: { + const body = message.parseBody(); + this.state.setCache(body.n, body.v, body.ttl); + response.ack(); + break; + } case BrokerType.Set: { - const body = message.parseBody(); - this.state.set(body.n, body.v); + const body = message.parseBody(); + this.state.setKey(body.n, body.v); response.ack(); break; } @@ -208,22 +220,72 @@ export class BrokerConnection extends RpcKernelBaseConnection { } case BrokerType.Delete: { const body = message.parseBody(); - this.state.delete(body.n); + this.state.deleteKey(body.n); + response.ack(); + break; + } + case BrokerType.InvalidateCache: { + const body = message.parseBody(); + const entry = this.state.getCache(body.n); + if (entry && this.state.invalidationCacheMessageConnections.length) { + this.state.deleteCache(body.n); + const message = createRpcMessage( + 0, BrokerType.ResponseInvalidationCache, + { key: body.n, ttl: entry.ttl }, + RpcMessageRouteType.server + ); + + for (const connection of this.state.invalidationCacheMessageConnections) { + connection.writer.write(message); + } + } response.ack(); break; } + case BrokerType.DeleteCache: { + const body = message.parseBody(); + this.state.deleteCache(body.n); + response.ack(); + break; + } + case BrokerType.GetCache: { + const body = message.parseBody(); + const v = this.state.getCache(body.n); + response.reply(BrokerType.ResponseGetCache, v || {}); + break; + } + case BrokerType.GetCacheMeta: { + const body = message.parseBody(); + const v = this.state.getCache(body.n); + response.reply(BrokerType.ResponseGetCacheMeta, v ? {ttl: v.ttl} : { missing: true }); + break; + } case BrokerType.Get: { const body = message.parseBody(); - const v = this.state.get(body.n); + const v = this.state.getKey(body.n); response.replyBinary(BrokerType.ResponseGet, v); break; } + case BrokerType.EnableInvalidationCacheMessages: { + this.state.invalidationCacheMessageConnections.push(this); + response.ack(); + break; + } } } } export class BrokerState { - public setStore = new Map(); + /** + * Simple key/value store. + */ + public keyStore = new Map(); + + /** + * Cache store. + */ + public cacheStore = new Map(); + public subscriptions = new Map(); public entityFields = new Map>(); @@ -236,6 +298,11 @@ export class BrokerState { public snapshotPath = './broker-snapshot.bson'; public snapshotting = false; + /** + * All connections in this list are notified about cache invalidations. + */ + public invalidationCacheMessageConnections: BrokerConnection[] = []; + protected lastSnapshotTimeout?: any; protected snapshot() { @@ -285,7 +352,7 @@ export class BrokerState { } public unsubscribeEntityFields(name: string, fields: string[]) { - let store = this.entityFields.get(name); + const store = this.entityFields.get(name); if (!store) return; let changed = false; for (const field of fields) { @@ -413,25 +480,38 @@ export class BrokerState { //todo: handle delays and retries } - public set(id: string, data: Uint8Array) { - this.setStore.set(id, data); + public setCache(id: string, data: Uint8Array, ttl: number) { + this.cacheStore.set(id, { data, ttl }); + } + + public getCache(id: string): { data: Uint8Array, ttl: number } | undefined { + return this.cacheStore.get(id); + } + + public deleteCache(id: string) { + this.cacheStore.delete(id); } public increment(id: string, v?: number): number { - const buffer = this.setStore.get(id); + const buffer = this.keyStore.get(id); const float64 = buffer ? new Float64Array(buffer.buffer, buffer.byteOffset) : new Float64Array(1); float64[0] += v || 1; - if (!buffer) this.setStore.set(id, new Uint8Array(float64.buffer)); + if (!buffer) this.keyStore.set(id, new Uint8Array(float64.buffer)); return float64[0]; } - public get(id: string): Uint8Array | undefined { - return this.setStore.get(id); + public setKey(id: string, data: Uint8Array) { + this.keyStore.set(id, data); + } + + public getKey(id: string): Uint8Array | undefined { + return this.keyStore.get(id); } - public delete(id: string) { - this.setStore.delete(id); + public deleteKey(id: string) { + this.keyStore.delete(id); } + } export class BrokerKernel extends RpcKernel { diff --git a/packages/broker/src/model.ts b/packages/broker/src/model.ts index ecd0b2bcd..3608b5162 100644 --- a/packages/broker/src/model.ts +++ b/packages/broker/src/model.ts @@ -21,10 +21,18 @@ export const enum BrokerType { Set, Get, + ResponseGet, Increment, ResponseIncrement, Delete, - ResponseGet, + + InvalidateCache, + SetCache, + GetCache, + ResponseGetCache, + GetCacheMeta, + ResponseGetCacheMeta, + DeleteCache, Lock, //110 Unlock, //111 @@ -34,6 +42,9 @@ export const enum BrokerType { ResponseLockFailed, ResponseIsLock, + EnableInvalidationCacheMessages, + ResponseInvalidationCache, + QueuePublish, QueueSubscribe, QueueUnsubscribe, @@ -65,14 +76,39 @@ export interface brokerSet { v: Uint8Array, } -export interface brokerResponseGet { +export interface brokerInvalidateCache { + n: string, +} + +export interface brokerSetCache { + n: string, + v: Uint8Array, + ttl: number; + tags?: string[]; +} + +export interface brokerInvalidateCacheMessage { + key: string; + ttl: number; +} + +export interface brokerResponseGetCache { v?: Uint8Array, + ttl?: number, } +export type brokerResponseGetCacheMeta = { + ttl: number, +} | { missing: true }; + export interface brokerGet { n: string; } +export interface brokerGetCache { + n: string; +} + export interface brokerBusPublish { c: string, v: Uint8Array, diff --git a/packages/broker/tests/broker.spec.ts b/packages/broker/tests/broker.spec.ts index 50729398e..37245524c 100644 --- a/packages/broker/tests/broker.spec.ts +++ b/packages/broker/tests/broker.spec.ts @@ -1,34 +1,26 @@ -import { expect, jest, test } from '@jest/globals'; -import { Broker, BrokerAdapter, BrokerBusChannel, BrokerCacheKey, BrokerQueueChannel } from '../src/broker.js'; +import { afterEach, expect, jest, test } from '@jest/globals'; +import { Broker, BrokerAdapter, BrokerBusChannel, BrokerQueueChannel } from '../src/broker.js'; import { BrokerMemoryAdapter } from '../src/adapters/memory-adapter.js'; +import { sleep } from '@deepkit/core'; jest.setTimeout(10000); -export let adapterFactory: () => Promise = async () => new BrokerMemoryAdapter(); +let lastAdapter: BrokerAdapter | undefined; +export let adapterFactory: () => Promise = async () => lastAdapter = new BrokerMemoryAdapter(); export function setAdapterFactory(factory: () => Promise) { adapterFactory = factory; } -type User = { id: number, username: string, created: Date }; - -test('cache1', async () => { - const broker = new Broker(await adapterFactory()); - - - type UserCache = BrokerCacheKey; - broker.provideCache((parameters) => { - return { id: parameters.id, username: 'peter', created: new Date }; - }); - - const userCache = broker.cache(); +afterEach(() => { + if (lastAdapter) lastAdapter.disconnect(); +}); - const entry = await userCache.get({ id: 2 }); - expect(entry).toEqual({ id: 2, username: 'peter', created: expect.any(Date) }); -}) +type User = { id: number, username: string, created: Date }; test('cache2', async () => { const broker = new Broker(await adapterFactory()); + const cache = broker.cache; const created = new Date; let called = 0; @@ -36,22 +28,52 @@ test('cache2', async () => { called++; return { id: 2, username: 'peter', created }; }; - const entry1 = await broker.get('user/' + 2, builder); + + const item = cache.item('user/' + 2, builder); + + const entry1 = await item.get(); expect(called).toBe(1); expect(entry1).toEqual({ id: 2, username: 'peter', created }); - const entry2 = await broker.get('user/' + 2, builder); + const entry2 = await item.get(); expect(called).toBe(1); expect(entry2).toEqual({ id: 2, username: 'peter', created }); + + const entry3 = await cache.item('user/' + 2, builder).get(); + expect(called).toBe(1); + expect(entry3).toEqual({ id: 2, username: 'peter', created }); + + await item.invalidate(); + const entry4 = await item.get(); + expect(called).toBe(2); + expect(entry4).toEqual({ id: 2, username: 'peter', created }); }); test('cache3', async () => { const broker = new Broker(await adapterFactory()); + const cache = broker.cache; - const entry = await broker.get('user/' + 2, async (): Promise => { - return { id: 2, username: 'peter', created: new Date }; - }); - expect(entry).toEqual({ id: 2, username: 'peter', created: expect.any(Date) }); + let build = 0; + const item = cache.item('key', async () => { + return build++; + }, { ttl: '100ms' }); + + expect(await item.exists()).toBe(false); + + const entry1 = await item.get(); + expect(entry1).toBe(0); + expect(await item.exists()).toBe(true); + + await sleep(0.01); + + const entry2 = await item.get(); + expect(entry2).toBe(0); + + await sleep(0.105); + + expect(await item.exists()).toBe(false); + const entry3 = await item.get(); + expect(entry3).toBe(1); }); test('bus', async () => { diff --git a/packages/framework/src/application-server.ts b/packages/framework/src/application-server.ts index 5dae848fa..b6d05cff6 100644 --- a/packages/framework/src/application-server.ts +++ b/packages/framework/src/application-server.ts @@ -231,7 +231,7 @@ export class ApplicationServer { cluster.on('exit', (w) => { this.onlineWorkers--; if (this.stopping) return; - this.logger.warning(`Worker ${w.id} died. Restarted`); + this.logger.warn(`Worker ${w.id} died. Restarted`); cluster.fork(); }); }); @@ -240,16 +240,16 @@ export class ApplicationServer { const stopServer = (signal: string) => async () => { killRequests++; if (killRequests === 3) { - this.logger.warning(`Received ${signal}. Force stopping server ...`); + this.logger.warn(`Received ${signal}. Force stopping server ...`); process.exit(1); return; } if (this.stopping) { - this.logger.warning(`Received ${signal}. Stopping already in process. Try again to force stop.`); + this.logger.warn(`Received ${signal}. Stopping already in process. Try again to force stop.`); return; } this.stopping = true; - this.logger.warning(`Received ${signal}. Stopping server ...`); + this.logger.warn(`Received ${signal}. Stopping server ...`); await this.stopWorkers(); this.stopResolver(); setTimeout(() => { @@ -296,16 +296,16 @@ export class ApplicationServer { const stopServer = (signal: string) => async () => { killRequests++; if (killRequests === 3) { - this.logger.warning(`Received ${signal}. Force stopping server ...`); + this.logger.warn(`Received ${signal}. Force stopping server ...`); process.exit(1); return; } if (this.stopping) { - this.logger.warning(`Received ${signal}. Stopping already in process. Try again to force stop.`); + this.logger.warn(`Received ${signal}. Stopping already in process. Try again to force stop.`); return; } this.stopping = true; - this.logger.warning('Received SIGINT. Stopping server ...'); + this.logger.warn('Received SIGINT. Stopping server ...'); await this.eventDispatcher.dispatch(onServerShutdown, new ServerShutdownEvent()); await this.eventDispatcher.dispatch(onServerMainShutdown, new ServerShutdownEvent()); if (this.httpWorker) await this.httpWorker.close(true); diff --git a/packages/http/src/http.ts b/packages/http/src/http.ts index 3c65952fb..b96ef7ce2 100644 --- a/packages/http/src/http.ts +++ b/packages/http/src/http.ts @@ -601,7 +601,7 @@ export class HttpListener { const timeout = middlewares[i].timeout; if (timeout !== undefined && timeout > 0) { lastTimer = setTimeout(() => { - logger.warning(`Middleware timed out. Increase the timeout or fix the middleware. (${middlewares[i].fn})`); + logger.warn(`Middleware timed out. Increase the timeout or fix the middleware. (${middlewares[i].fn})`); next(); }, timeout); } diff --git a/packages/logger/src/logger.ts b/packages/logger/src/logger.ts index 8e8fbcf7b..4ed60bd0b 100644 --- a/packages/logger/src/logger.ts +++ b/packages/logger/src/logger.ts @@ -11,7 +11,7 @@ import style from 'ansi-styles'; import format from 'format-util'; import { arrayRemoveItem, ClassType } from '@deepkit/core'; -import { FactoryProvider, Inject, TransientInjectionTarget } from '@deepkit/injector'; +import { Inject, TransientInjectionTarget } from '@deepkit/injector'; export enum LoggerLevel { none, @@ -52,7 +52,7 @@ export class ConsoleTransport implements LoggerTransport { } export class JSONTransport implements LoggerTransport { - out: {write: (v: string) => any} = process.stdout; + out: { write: (v: string) => any } = process.stdout; write(message: LogMessage) { this.out.write(JSON.stringify({ @@ -180,7 +180,7 @@ export interface LoggerInterface { error(...message: any[]): void; - warning(...message: any[]): void; + warn(...message: any[]): void; log(...message: any[]): void; @@ -294,7 +294,7 @@ export class Logger implements LoggerInterface { this.send(message, LoggerLevel.error); } - warning(...message: any[]) { + warn(...message: any[]) { this.send(message, LoggerLevel.warning); } @@ -311,6 +311,15 @@ export class Logger implements LoggerInterface { } } +/** + * Logger with pre-configured console transport. + */ +export class ConsoleLogger extends Logger { + constructor() { + super([new ConsoleTransport]); + } +} + export type ScopedLogger = Inject; export const ScopedLogger = { provide: 'scoped-logger', diff --git a/tslint.json b/tslint.json index dedcca0a7..154b6fc70 100644 --- a/tslint.json +++ b/tslint.json @@ -2,7 +2,7 @@ "rules": { "arrow-return-shorthand": true, "callable-types": true, - "class-name": true, + "class-name": false, "comment-format": false, "curly": false, "deprecation": {