Skip to content

Commit

Permalink
refactor(broker): new easier cache API
Browse files Browse the repository at this point in the history
add also BrokerCacheItem.exists().

Rework cache API to use multi-level caches: L1 is in-memory, and L2 is on the broker server (deepkit broker, Redis, ...). L1 is synced via invalidation bus messages.

refactor(logger): rename warning to warn to be consistent with console.* API
  • Loading branch information
marcj committed Nov 17, 2023
1 parent dc5d6dd commit 641187e
Show file tree
Hide file tree
Showing 11 changed files with 564 additions and 152 deletions.
11 changes: 11 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions packages/broker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
"scripts": {
"build": "echo '{\"type\": \"module\"}' > ./dist/esm/package.json"
},
"dependencies": {
"@lukeed/ms": "^2.0.1"
},
"peerDependencies": {
"@deepkit/app": "^1.0.1-alpha.40",
"@deepkit/bson": "^1.0.1-alpha.40",
Expand Down
74 changes: 66 additions & 8 deletions packages/broker/src/adapters/deepkit-adapter.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
import { BrokerAdapter, BrokerCacheOptions, BrokerLockOptions, BrokerQueueMessage, Release } from '../broker.js';
import { BrokerAdapter, BrokerCacheItemOptionsResolved, BrokerQueueMessage, BrokerTimeOptionsResolved, Release } from '../broker.js';
import { getTypeJitContainer, ReflectionKind, Type, TypePropertySignature } from '@deepkit/type';
import {
brokerBusPublish,
brokerBusResponseHandleMessage,
brokerBusSubscribe,
brokerGet,
brokerGetCache,
brokerIncrement,
brokerInvalidateCache,
brokerInvalidateCacheMessage,
brokerLock,
brokerLockId,
BrokerQueueMessageHandled,
BrokerQueuePublish,
BrokerQueueResponseHandleMessage,
BrokerQueueSubscribe,
BrokerQueueUnsubscribe,
brokerResponseGetCache,
brokerResponseGetCacheMeta,
brokerResponseIncrement,
brokerResponseIsLock,
brokerSet,
brokerSetCache,
BrokerType
} from '../model.js';
import { ClientTransportAdapter, createRpcMessage, RpcBaseClient, RpcMessage, RpcMessageRouteType, RpcWebSocketClientAdapter } from '@deepkit/rpc';
Expand Down Expand Up @@ -67,6 +73,8 @@ export class BrokerDeepkitConnection extends RpcBaseClient {
activeChannels = new Map<string, { listeners: number, callbacks: ((v: Uint8Array) => void)[] }>();
consumers = new Map<string, { listeners: number, callbacks: ((id: number, v: Uint8Array) => void)[] }>();

subscribedToInvalidations?: ((message: brokerInvalidateCacheMessage) => void)[];

protected onMessage(message: RpcMessage) {
if (message.routeType === RpcMessageRouteType.server) {
if (message.type === BrokerType.EntityFields) {
Expand All @@ -78,6 +86,11 @@ export class BrokerDeepkitConnection extends RpcBaseClient {
const channel = this.activeChannels.get(body.c);
if (!channel) return;
for (const callback of channel.callbacks) callback(body.v);
} else if (message.type === BrokerType.ResponseInvalidationCache) {
const body = message.parseBody<brokerInvalidateCacheMessage>();
if (this.subscribedToInvalidations) {
for (const callback of this.subscribedToInvalidations) callback(body);
}
} else if (message.type === BrokerType.QueueResponseHandleMessage) {
const body = message.parseBody<BrokerQueueResponseHandleMessage>();
const consumer = this.consumers.get(body.c);
Expand Down Expand Up @@ -140,6 +153,7 @@ export class BrokerDeepkitPool {
*/
export class BrokerDeepkitAdapter implements BrokerAdapter {
protected pool = new BrokerDeepkitPool(this.options);
protected onInvalidateCacheCallbacks: ((message: brokerInvalidateCacheMessage) => void)[] = [];

constructor(public options: BrokerDeepkitAdapterOptions) {
}
Expand All @@ -148,14 +162,54 @@ export class BrokerDeepkitAdapter implements BrokerAdapter {
await this.pool.disconnect();
}

async setCache(key: string, value: any, options: BrokerCacheOptions, type: Type): Promise<void> {
onInvalidateCache(callback: (message: brokerInvalidateCacheMessage) => void): void {
this.onInvalidateCacheCallbacks.push(callback);
}

async invalidateCache(key: string): Promise<void> {
await this.pool.getConnection('cache/' + key).sendMessage<brokerInvalidateCache>(BrokerType.InvalidateCache, { n: key }).ackThenClose();
}

async setCache(key: string, value: any, options: BrokerCacheItemOptionsResolved, type: Type): Promise<void> {
const serializer = getSerializer(type);
const v = serializer.encode(value);
await this.pool.getConnection('cache/' + key).sendMessage<brokerSet>(BrokerType.Set, { n: key, v }).ackThenClose();
await this.pool.getConnection('cache/' + key).sendMessage<brokerSetCache>(BrokerType.SetCache, { n: key, v, ttl: options.ttl }).ackThenClose();
}

async getCacheMeta(key: string): Promise<{ ttl: number } | undefined> {
const first = await this.pool.getConnection('cache/' + key)
.sendMessage<brokerGetCache>(BrokerType.GetCacheMeta, { n: key })
.firstThenClose<brokerResponseGetCacheMeta>(BrokerType.ResponseGetCacheMeta);
if ('missing' in first) return undefined;
return first;
}

async getCache(key: string, type: Type): Promise<any> {
const first: RpcMessage = await this.pool.getConnection('cache/' + key)
async getCache(key: string, type: Type): Promise<{ value: any, ttl: number } | undefined> {
const connection = this.pool.getConnection('cache/' + key);

if (!connection.subscribedToInvalidations) {
connection.subscribedToInvalidations = this.onInvalidateCacheCallbacks;
await connection
.sendMessage(BrokerType.EnableInvalidationCacheMessages)
.ackThenClose();
}

const first = await connection
.sendMessage<brokerGetCache>(BrokerType.GetCache, { n: key })
.firstThenClose<brokerResponseGetCache>(BrokerType.ResponseGetCache);

const serializer = getSerializer(type);
return first.v && first.ttl !== undefined ? { value: serializer.decode(first.v, 0), ttl: first.ttl } : undefined;
}

async set(key: string, value: any, type: Type): Promise<void> {
const serializer = getSerializer(type);
const v = serializer.encode(value);
await this.pool.getConnection('key/' + key).sendMessage<brokerSet>(BrokerType.Set, { n: key, v }).ackThenClose();
}

async get(key: string, type: Type): Promise<any> {
const first: RpcMessage = await this.pool.getConnection('key/' + key)
.sendMessage<brokerGet>(BrokerType.Get, { n: key }).firstThenClose(BrokerType.ResponseGet);
if (first.buffer && first.buffer.byteLength > first.bodyOffset) {
const serializer = getSerializer(type);
Expand All @@ -177,7 +231,7 @@ export class BrokerDeepkitAdapter implements BrokerAdapter {
return response.v;
}

async lock(id: string, options: BrokerLockOptions): Promise<undefined | Release> {
async lock(id: string, options: BrokerTimeOptionsResolved): Promise<undefined | Release> {
const subject = this.pool.getConnection('lock/' + id)
.sendMessage<brokerLock>(BrokerType.Lock, { id, ttl: options.ttl, timeout: options.timeout });
await subject.waitNext(BrokerType.ResponseLock); //or throw error
Expand All @@ -188,7 +242,7 @@ export class BrokerDeepkitAdapter implements BrokerAdapter {
};
}

async tryLock(id: string, options: BrokerLockOptions): Promise<undefined | Release> {
async tryLock(id: string, options: BrokerTimeOptionsResolved): Promise<undefined | Release> {
const subject = this.pool.getConnection('lock/' + id)
.sendMessage<brokerLock>(BrokerType.TryLock, { id, ttl: options.ttl });
const message = await subject.waitNextMessage();
Expand Down Expand Up @@ -218,9 +272,13 @@ export class BrokerDeepkitAdapter implements BrokerAdapter {
}

async subscribe(key: string, callback: (message: any) => void, type: Type): Promise<Release> {
const connection = this.pool.getConnection('bus/' + key);
return await this._subscribe(connection, key, callback, type);
}

protected async _subscribe(connection: BrokerDeepkitConnection, key: string, callback: (message: any) => void, type: Type): Promise<Release> {
const serializer = getSerializer(type);

const connection = this.pool.getConnection('bus/' + key);
const parsedCallback = (next: Uint8Array) => {
try {
const parsed = serializer.decode(next, 0);
Expand Down
Loading

0 comments on commit 641187e

Please sign in to comment.