From a150e3d5132754d08eea194f6789675f05dd23e6 Mon Sep 17 00:00:00 2001 From: "Marc J. Schmidt" Date: Wed, 18 Oct 2023 23:50:06 +0200 Subject: [PATCH] feature(broker): introduce adapter abstraction + start queue features This makes it possible to use deepkit/broker with Redis/AWS Queues/etc by implementing a new adapter. This adds also queue support for Deepkit Broker server. Not yet finished. --- .../broker/src/adapters/deepkit-adapter.ts | 227 ++++++++++++++++++ .../broker/src/adapters/memory-adapter.ts | 73 +----- packages/broker/src/broker.ts | 111 +++++++-- packages/broker/src/client.ts | 14 +- packages/broker/src/kernel.ts | 160 +++++++++++- packages/broker/src/model.ts | 73 +++++- packages/broker/src/snaptshot.ts | 67 ++++++ packages/broker/tests/broker.spec.ts | 30 ++- packages/broker/tests/deepkit-broker.spec.ts | 21 ++ packages/broker/tests/snapshot.spec.ts | 45 ++++ 10 files changed, 706 insertions(+), 115 deletions(-) create mode 100644 packages/broker/src/adapters/deepkit-adapter.ts create mode 100644 packages/broker/src/snaptshot.ts create mode 100644 packages/broker/tests/deepkit-broker.spec.ts create mode 100644 packages/broker/tests/snapshot.spec.ts diff --git a/packages/broker/src/adapters/deepkit-adapter.ts b/packages/broker/src/adapters/deepkit-adapter.ts new file mode 100644 index 000000000..0d138e644 --- /dev/null +++ b/packages/broker/src/adapters/deepkit-adapter.ts @@ -0,0 +1,227 @@ +import { BrokerAdapter, BrokerCacheOptions, BrokerLockOptions, BrokerQueueMessage, Release } from '../broker.js'; +import { getTypeJitContainer, ReflectionKind, Type, TypePropertySignature } from '@deepkit/type'; +import { + brokerBusPublish, + brokerBusResponseHandleMessage, + brokerBusSubscribe, + brokerGet, + brokerIncrement, + brokerLock, + BrokerQueueMessageHandled, + BrokerQueuePublish, + BrokerQueueResponseHandleMessage, + BrokerQueueSubscribe, + BrokerQueueUnsubscribe, + brokerResponseIncrement, + brokerSet, + BrokerType +} from '../model.js'; +import { createRpcMessage, RpcBaseClient, RpcMessage, RpcMessageRouteType } from '@deepkit/rpc'; +import { deserializeBSON, deserializeBSONWithoutOptimiser, getBSONDeserializer, getBSONSerializer, serializeBSON } from '@deepkit/bson'; +import { arrayRemoveItem } from '@deepkit/core'; + +interface TypeSerialize { + encode(v: any): Uint8Array; + + decode(v: Uint8Array, offset: number): any; +} + +function getSerializer(type: Type): TypeSerialize { + const container = getTypeJitContainer(type); + if (container.brokerSerializer) return container.brokerSerializer; + + const standaloneType = type.kind === ReflectionKind.objectLiteral || (type.kind === ReflectionKind.class && type.types.length); + + if (!standaloneType) { + //BSON only supports objects, so we wrap it into a {v: type} object. + type = { + kind: ReflectionKind.objectLiteral, + types: [{ + kind: ReflectionKind.propertySignature, + name: 'v', + type: type, + } as TypePropertySignature] + }; + + const decoder = getBSONDeserializer(undefined, type); + const encoder = getBSONSerializer(undefined, type); + + return container.brokerSerializer = { + decode: (v: Uint8Array, offset: number) => decoder(v, offset).v, + encode: (v: any) => encoder({ v }), + }; + } + + const decoder = getBSONDeserializer(undefined, type); + const encoder = getBSONSerializer(undefined, type); + + return container.brokerSerializer = { + decode: (v: Uint8Array, offset: number) => decoder(v, offset), + encode: (v: any) => encoder(v), + }; +} + +/** + * This is the Broker adapter for Deepkit Broker server. + */ +export class BrokerDeepkitAdapter extends RpcBaseClient implements BrokerAdapter { + protected activeChannels = new Map void)[] }>(); + protected consumers = new Map void)[] }>(); + + protected onMessage(message: RpcMessage) { + if (message.routeType === RpcMessageRouteType.server) { + if (message.type === BrokerType.EntityFields) { + // const fields = message.parseBody(); + // this.knownEntityFields.set(fields.name, fields.fields); + this.transporter.send(createRpcMessage(message.id, BrokerType.Ack, undefined, RpcMessageRouteType.server)); + } else if (message.type === BrokerType.ResponseSubscribeMessage) { + const body = message.parseBody(); + const channel = this.activeChannels.get(body.c); + if (!channel) return; + for (const callback of channel.callbacks) callback(body.v); + } else if (message.type === BrokerType.QueueResponseHandleMessage) { + const body = message.parseBody(); + const consumer = this.consumers.get(body.c); + if (!consumer) return; + for (const callback of consumer.callbacks) callback(body.id, body.v); + } + } else { + super.onMessage(message); + } + } + + async disconnect(): Promise { + super.disconnect(); + } + + async setCache(key: string, value: any, options: BrokerCacheOptions, type: Type): Promise { + const serializer = getSerializer(type); + await this.sendMessage(BrokerType.Set, { n: key, v: serializer.encode(value) }).ackThenClose(); + } + + async getCache(key: string, type: Type): Promise { + const first: RpcMessage = await this.sendMessage(BrokerType.Get, { n: key }).firstThenClose(BrokerType.ResponseGet); + if (first.buffer && first.buffer.byteLength > first.bodyOffset) { + const serializer = getSerializer(type); + return serializer.decode(first.buffer, first.bodyOffset); + } + } + + async increment(key: string, value: any): Promise { + const response = await this.sendMessage(BrokerType.Increment, { n: key, v: value }) + .waitNext(BrokerType.ResponseIncrement); + return response.v; + } + + async lock(id: string, options: BrokerLockOptions): Promise { + const subject = this.sendMessage(BrokerType.Lock, { id, ttl: options.ttl, timeout: options.timeout }); + await subject.waitNext(BrokerType.ResponseLock); //or throw error + + return async () => { + await subject.send(BrokerType.Unlock).ackThenClose(); + subject.release(); + }; + } + + async tryLock(id: string, options: BrokerLockOptions): Promise { + const subject = this.sendMessage(BrokerType.TryLock, { id, ttl: options.ttl }); + const message = await subject.waitNextMessage(); + if (message.type === BrokerType.ResponseLockFailed) { + subject.release(); + return; + } + + if (message.type === BrokerType.ResponseLock) { + return async () => { + await subject.send(BrokerType.Unlock).ackThenClose(); + }; + } + + throw new Error(`Invalid message returned. Expected Lock, but got ${message.type}`); + } + + async publish(key: string, message: any, type: Type): Promise { + const serializer = getSerializer(type); + const v = serializer.encode(message); + await this.sendMessage(BrokerType.Publish, { c: key, v: v }) + .ackThenClose(); + + return undefined; + } + + async subscribe(key: string, callback: (message: any) => void, type: Type): Promise { + const serializer = getSerializer(type); + + const parsedCallback = (next: Uint8Array) => { + try { + const parsed = serializer.decode(next, 0); + callback(parsed); + } catch (error: any) { + console.log('message', Buffer.from(next).toString('utf8'), deserializeBSONWithoutOptimiser(next)); + console.error(`Could not parse channel message ${key}: ${error}`); + } + }; + + let channel = this.activeChannels.get(key); + if (!channel) { + channel = { + listeners: 0, + callbacks: [], + }; + this.activeChannels.set(key, channel); + } + + channel.listeners++; + channel.callbacks.push(parsedCallback); + + if (channel.listeners === 1) { + await this.sendMessage(BrokerType.Subscribe, { c: key }) + .ackThenClose(); + } + + return async () => { + channel!.listeners--; + arrayRemoveItem(channel!.callbacks, parsedCallback); + if (channel!.listeners === 0) { + await this.sendMessage(BrokerType.Unsubscribe, { c: key }) + .ackThenClose(); + } + }; + } + + async produce(key: string, message: T, type: Type, options?: { delay?: number; priority?: number; }): Promise { + await this.sendMessage(BrokerType.QueuePublish, { + c: key, + v: serializeBSON(message, undefined, type), + delay: options?.delay, + priority: options?.priority + }).ackThenClose(); + } + + async consume(key: string, callback: (message: BrokerQueueMessage) => Promise, options: { maxParallel: number }, type: Type): Promise { + // when this is acked, we start receiving messages via BrokerQueueResponseHandleMessage + await this.sendMessage(BrokerType.QueueSubscribe, { c: key, maxParallel: options.maxParallel }) + .ackThenClose(); + + this.consumers.set(key, { + listeners: 1, + callbacks: [async (id: number, next: Uint8Array) => { + const data = deserializeBSON(next, 0, undefined, type); + const message = new BrokerQueueMessage(key, data); + await callback(message); + + await this.sendMessage(BrokerType.QueueMessageHandled, { + id, c: key, + success: message.state === 'done', + error: message.error ? String(message.error) : undefined, + delay: message.delayed, + }).ackThenClose(); + }] + }); + + return async () => { + await this.sendMessage(BrokerType.QueueUnsubscribe, { c: key }) + .ackThenClose(); + }; + } +} diff --git a/packages/broker/src/adapters/memory-adapter.ts b/packages/broker/src/adapters/memory-adapter.ts index 9479f9a7c..8e4842dbc 100644 --- a/packages/broker/src/adapters/memory-adapter.ts +++ b/packages/broker/src/adapters/memory-adapter.ts @@ -1,70 +1,13 @@ -import { BrokerAdapter, BrokerCacheOptions, BrokerLockOptions } from '../broker.js'; -import { Type } from '@deepkit/type'; -import { ProcessLock } from '@deepkit/core'; +import { BrokerDeepkitAdapter } from './deepkit-adapter.js'; +import { BrokerKernel } from '../kernel.js'; +import { RpcDirectClientAdapter } from '@deepkit/rpc'; -export class BrokerMemoryAdapter implements BrokerAdapter { - protected cache: { [key: string]: any } = {}; - protected channels: { [key: string]: ((m: any) => void)[] } = {}; - protected locks: { [key: string]: ProcessLock } = {}; - async disconnect(): Promise { - } - - async lock(id: string, options: BrokerLockOptions): Promise { - const lock = new ProcessLock(id); - await lock.acquire(options.ttl, options.timeout); - this.locks[id] = lock; - } - - async tryLock(id: string, options: BrokerLockOptions): Promise { - const lock = new ProcessLock(id); - if (lock.tryLock(options.ttl)) { - this.locks[id] = lock; - return true; - } - return false; - } - - async release(id: string): Promise { - if (this.locks[id]) { - this.locks[id].unlock(); - delete this.locks[id]; - } - } - - async getCache(key: string): Promise { - return this.cache[key]; - } - - async setCache(key: string, value: any, options: BrokerCacheOptions) { - this.cache[key] = value; - } - - async increase(key: string, value: any): Promise { - if (!(key in this.cache)) this.cache[key] = 0; - this.cache[key] += value; - } - - async subscribe(key: string, callback: (message: any) => void, type: Type): Promise<{ unsubscribe: () => Promise }> { - if (!(key in this.channels)) this.channels[key] = []; - const fn = (m: any) => { - callback(m); - }; - this.channels[key].push(fn); - - return { - unsubscribe: async () => { - const index = this.channels[key].indexOf(fn); - if (index !== -1) this.channels[key].splice(index, 1); - } - }; - } - - async publish(key: string, message: T): Promise { - if (!(key in this.channels)) return; - for (const callback of this.channels[key]) { - callback(message); - } +export class BrokerMemoryAdapter extends BrokerDeepkitAdapter { + constructor() { + const kernel = new BrokerKernel(); + const client = new RpcDirectClientAdapter(kernel); + super(client); } } diff --git a/packages/broker/src/broker.ts b/packages/broker/src/broker.ts index f86755c04..1e2876558 100644 --- a/packages/broker/src/broker.ts +++ b/packages/broker/src/broker.ts @@ -17,22 +17,26 @@ export interface BrokerLockOptions { timeout: number; } -export interface BrokerAdapter { - lock(id: string, options: BrokerLockOptions): Promise; +export type Release = () => Promise; - tryLock(id: string, options: BrokerLockOptions): Promise; +export interface BrokerAdapter { + lock(id: string, options: BrokerLockOptions): Promise; - release(id: string): Promise; + tryLock(id: string, options: BrokerLockOptions): Promise; getCache(key: string, type: Type): Promise; setCache(key: string, value: any, options: BrokerCacheOptions, type: Type): Promise; - increase(key: string, value: any): Promise; + increment(key: string, value: any): Promise; + + publish(name: string, message: any, type: Type): Promise; - publish(key: string, message: any, type: Type): Promise; + subscribe(name: string, callback: (message: any) => void, type: Type): Promise; - subscribe(key: string, callback: (message: any) => void, type: Type): Promise<{ unsubscribe: () => Promise }>; + consume(name: string, callback: (message: any) => Promise, options: { maxParallel: number }, type: Type): Promise; + + produce(name: string, message: any, type: Type, options?: { delay?: number, priority?: number }): Promise; disconnect(): Promise; } @@ -47,26 +51,76 @@ export interface BrokerCacheOptions { export class CacheError extends Error { } -export type BrokerBusChannel = [Channel, Parameters, Type]; +export type BrokerBusChannel = [Name, Parameters, Type]; export type BrokerCacheKey = [Key, Parameters, Type]; +export type BrokerQueueChannel = [Name, Type]; + export type CacheBuilder> = (parameters: T[1], options: BrokerCacheOptions) => T[2] | Promise; +export class BrokerQueueMessage { + public state: 'pending' | 'done' | 'failed' = 'pending'; + public error?: Error; + + public tries: number = 0; + public delayed: number = 0; + + constructor( + public channel: string, + public data: T, + ) { + } + + public failed(error: Error) { + this.state = 'failed'; + this.error = error; + } + + public delay(seconds: number) { + this.delayed = seconds; + } +} + + +export class BrokerQueue { + constructor( + public name: string, + private adapter: BrokerAdapter, + private type: Type, + ) { + } + + async produce(message: T, options?: { delay?: number, priority?: number }): Promise { + await this.adapter.produce(this.name, message, this.type, options); + } + + async consume(callback: (message: BrokerQueueMessage) => Promise | void, options: { maxParallel?: number } = {}): Promise { + return await this.adapter.consume(this.name, async (message) => { + try { + await callback(message); + } catch (error: any) { + message.state = 'failed'; + message.error = error; + } + }, Object.assign({maxParallel: 1}, options), this.type); + } +} + export class BrokerBus { constructor( - private channel: string, + public name: string, private adapter: BrokerAdapter, private type: Type, ) { } async publish(message: T) { - return this.adapter.publish(this.channel, message, this.type); + return this.adapter.publish(this.name, message, this.type); } - async subscribe(callback: (message: T) => void): Promise<{ unsubscribe: () => Promise }> { - return this.adapter.subscribe(this.channel, callback, this.type); + async subscribe(callback: (message: T) => void): Promise { + return this.adapter.subscribe(this.name, callback, this.type); } } @@ -94,9 +148,9 @@ export class BrokerCache> { await this.adapter.setCache(cacheKey, value, { ...this.options, ...options }, this.type); } - async increase(parameters: T[1], value: number) { + async increment(parameters: T[1], value: number) { const cacheKey = this.getCacheKey(parameters); - await this.adapter.increase(cacheKey, value); + await this.adapter.increment(cacheKey, value); } async get(parameters: T[1]): Promise { @@ -113,7 +167,7 @@ export class BrokerCache> { } export class BrokerLock { - public acquired: boolean = false; + protected releaser?: Release; constructor( private id: string, @@ -122,20 +176,24 @@ export class BrokerLock { ) { } + get acquired(): boolean { + return this.releaser !== undefined; + } + async acquire(): Promise { - await this.adapter.lock(this.id, this.options); - this.acquired = true; + this.releaser = await this.adapter.lock(this.id, this.options); } async try(): Promise { if (this.acquired) return true; - - return this.acquired = await this.adapter.tryLock(this.id, this.options); + this.releaser = await this.adapter.tryLock(this.id, this.options); + return this.acquired; } async release(): Promise { - this.acquired = false; - await this.adapter.release(this.id); + if (!this.releaser) return; + await this.releaser(); + this.releaser = undefined; } } @@ -146,7 +204,7 @@ export class Broker { } public lock(id: string, options: Partial = {}): BrokerLock { - return new BrokerLock(id, this.adapter, Object.assign({ ttl: 60*2, timeout: 30 }, options)); + return new BrokerLock(id, this.adapter, Object.assign({ ttl: 60 * 2, timeout: 30 }, options)); } public disconnect(): Promise { @@ -199,11 +257,14 @@ export class Broker { if (type.kind !== ReflectionKind.tuple) throw new CacheError(`Invalid type given`); if (type.types[0].type.kind !== ReflectionKind.literal) throw new CacheError(`Invalid type given`); const path = String(type.types[0].type.literal); - return new BrokerBus(path, this.adapter, type.types[2].type); } - public queue(channel: string, type?: ReceiveType) { - + public queue>(type?: ReceiveType): BrokerQueue { + type = resolveReceiveType(type); + if (type.kind !== ReflectionKind.tuple) throw new CacheError(`Invalid type given`); + if (type.types[0].type.kind !== ReflectionKind.literal) throw new CacheError(`Invalid type given`); + const name = String(type.types[0].type.literal); + return new BrokerQueue(name, this.adapter, type.types[1].type); } } diff --git a/packages/broker/src/client.ts b/packages/broker/src/client.ts index 0cd8ec5de..b14a2b219 100644 --- a/packages/broker/src/client.ts +++ b/packages/broker/src/client.ts @@ -20,12 +20,12 @@ import { brokerIncrement, brokerLock, brokerLockId, - brokerPublish, + brokerBusPublish, brokerResponseIncrement, brokerResponseIsLock, - brokerResponseSubscribeMessage, + brokerBusResponseHandleMessage, brokerSet, - brokerSubscribe, + brokerBusSubscribe, BrokerType } from './model.js'; import { ReceiveType, ReflectionClass, ReflectionKind, resolveReceiveType, Type, TypePropertySignature } from '@deepkit/type'; @@ -61,7 +61,7 @@ export class BrokerChannel { const serializer = getBSONSerializer(undefined, this.type); const v = this.wrapped ? serializer({ v: data }) : serializer(data); - await this.client.sendMessage(BrokerType.Publish, { c: this.channel, v: v }) + await this.client.sendMessage(BrokerType.Publish, { c: this.channel, v: v }) .ackThenClose(); return undefined; @@ -88,7 +88,7 @@ export class BrokerChannel { this.callbacks.push(parsedCallback); if (this.listener === 1) { - await this.client.sendMessage(BrokerType.Subscribe, { c: this.channel }) + await this.client.sendMessage(BrokerType.Subscribe, { c: this.channel }) .ackThenClose(); } @@ -96,7 +96,7 @@ export class BrokerChannel { this.listener--; arrayRemoveItem(this.callbacks, parsedCallback); if (this.listener === 0) { - await this.client.sendMessage(BrokerType.Unsubscribe, { c: this.channel }) + await this.client.sendMessage(BrokerType.Unsubscribe, { c: this.channel }) .ackThenClose(); } }); @@ -188,7 +188,7 @@ export class BrokerClient extends RpcBaseClient { this.knownEntityFields.set(fields.name, fields.fields); this.transporter.send(createRpcMessage(message.id, BrokerType.Ack, undefined, RpcMessageRouteType.server)); } else if (message.type === BrokerType.ResponseSubscribeMessage) { - const body = message.parseBody(); + const body = message.parseBody(); const channel = this.activeChannels.get(body.c); if (!channel) return; channel.next(body.v); diff --git a/packages/broker/src/kernel.ts b/packages/broker/src/kernel.ts index 81ee86476..5d5a7787d 100644 --- a/packages/broker/src/kernel.ts +++ b/packages/broker/src/kernel.ts @@ -11,20 +11,36 @@ import { arrayRemoveItem, ProcessLock, ProcessLocker } from '@deepkit/core'; import { createRpcMessage, RpcConnectionWriter, RpcKernel, RpcKernelBaseConnection, RpcKernelConnections, RpcMessage, RpcMessageBuilder, RpcMessageRouteType } from '@deepkit/rpc'; import { + brokerBusPublish, + brokerBusResponseHandleMessage, + brokerBusSubscribe, brokerDelete, brokerEntityFields, brokerGet, brokerIncrement, brokerLock, brokerLockId, - brokerPublish, + BrokerQueueMessageHandled, + BrokerQueuePublish, + BrokerQueueResponseHandleMessage, + BrokerQueueSubscribe, brokerResponseIncrement, brokerResponseIsLock, - brokerResponseSubscribeMessage, brokerSet, - brokerSubscribe, - BrokerType + BrokerType, + QueueMessage, + QueueMessageState } from './model.js'; +import cluster from 'cluster'; +import { closeSync, openSync, renameSync, writeSync } from 'fs'; +import { snapshotState } from './snaptshot.js'; + +export interface Queue { + currentId: number; + name: string; + messages: QueueMessage[]; + consumers: { con: BrokerConnection, handling: QueueMessage[], maxMessagesInParallel: number }[]; +} export class BrokerConnection extends RpcKernelBaseConnection { protected subscribedChannels: string[] = []; @@ -48,7 +64,6 @@ export class BrokerConnection extends RpcKernelBaseConnection { for (const lock of this.locks.values()) { lock.unlock(); } - } protected async sendEntityFields(name: string) { @@ -135,22 +150,46 @@ export class BrokerConnection extends RpcKernelBaseConnection { }); break; } + case BrokerType.QueuePublish: { + const body = message.parseBody(); + this.state.queuePublish(body.c, body.v, body.delay, body.priority); + response.ack(); + break; + } + case BrokerType.QueueSubscribe: { + const body = message.parseBody(); + this.state.queueSubscribe(body.c, this, body.maxParallel); + response.ack(); + break; + } + case BrokerType.QueueUnsubscribe: { + const body = message.parseBody(); + this.state.queueUnsubscribe(body.c, this); + response.ack(); + break; + } + case BrokerType.QueueMessageHandled: { + const body = message.parseBody(); + this.state.queueMessageHandled(body.c, this, body.id, { error: body.error, success: body.success, delay: body.delay }); + response.ack(); + break; + } case BrokerType.Subscribe: { - const body = message.parseBody(); + const body = message.parseBody(); this.state.subscribe(body.c, this); this.subscribedChannels.push(body.c); response.ack(); break; } case BrokerType.Unsubscribe: { - const body = message.parseBody(); + const body = message.parseBody(); this.state.unsubscribe(body.c, this); arrayRemoveItem(this.subscribedChannels, body.c); response.ack(); break; } case BrokerType.Publish: { - const body = message.parseBody(); + const body = message.parseBody(); this.state.publish(body.c, body.v); response.ack(); break; @@ -187,8 +226,40 @@ export class BrokerState { public subscriptions = new Map(); public entityFields = new Map>(); + public queues = new Map(); + public locker = new ProcessLocker(); + public enableSnapshot = false; + public snapshotInterval = 15; + public snapshotPath = './broker-snapshot.bson'; + public snapshotting = false; + + protected lastSnapshotTimeout?: any; + + protected snapshot() { + if (cluster.isMaster) { + this.snapshotting = true; + cluster.fork(); + + cluster.on('exit', (worker) => { + this.snapshotting = false; + }); + return; + } + + //we are in the worker now + + const snapshotTempPath = this.snapshotPath + '.tmp'; + //open file for writing, create if not exists, truncate if exists + const file = openSync(snapshotTempPath, 'w+'); + snapshotState(this, (v) => writeSync(file, v)); + closeSync(file); + + //rename temp file to final file + renameSync(snapshotTempPath, this.snapshotPath); + } + public getEntityFields(name: string): string[] { return Array.from(this.entityFields.get(name)?.keys() || []); } @@ -260,7 +331,7 @@ export class BrokerState { public publish(channel: string, v: Uint8Array) { const subscriptions = this.subscriptions.get(channel); if (!subscriptions) return; - const message = createRpcMessage( + const message = createRpcMessage( 0, BrokerType.ResponseSubscribeMessage, { c: channel, v: v }, RpcMessageRouteType.server ); @@ -270,6 +341,77 @@ export class BrokerState { } } + protected getQueue(queueName: string) { + let queue = this.queues.get(queueName); + if (!queue) { + queue = { currentId: 0, name: queueName, messages: [], consumers: [] }; + this.queues.set(queueName, queue); + } + return queue; + } + + public queueSubscribe(queueName: string, connection: BrokerConnection, maxParallel: number) { + const queue = this.getQueue(queueName); + queue.consumers.push({ con: connection, handling: [], maxMessagesInParallel: 1 }); + } + + public queueUnsubscribe(queueName: string, connection: BrokerConnection) { + const queue = this.getQueue(queueName); + const index = queue.consumers.findIndex(v => v.con === connection); + if (index === -1) return; + queue.consumers.splice(index, 1); + } + + public queuePublish(queueName: string, v: Uint8Array, delay?: number, priority?: number) { + const queue = this.getQueue(queueName); + + const m: QueueMessage = { id: queue.currentId++, state: QueueMessageState.pending, tries: 0, v, delay: delay || 0, priority }; + queue.messages.push(m); + + if (m.delay > Date.now()) { + // todo: how to handle delay? many timeouts or one timeout? + return; + } + + for (const consumer of queue.consumers) { + if (consumer.handling.length >= consumer.maxMessagesInParallel) continue; + consumer.handling.push(m); + m.tries++; + m.state = QueueMessageState.inFlight; + m.lastError = undefined; + consumer.con.writer.write(createRpcMessage( + 0, BrokerType.QueueResponseHandleMessage, + { c: queueName, v, id: m.id }, RpcMessageRouteType.server + )); + } + + //todo: handle queues messages and sending when new consumer connects + } + + /** + * When a queue message has been sent to a consumer and the consumer answers. + */ + public queueMessageHandled(queueName: string, connection: BrokerConnection, id: number, answer: { error?: string, success: boolean, delay?: number }) { + const queue = this.queues.get(queueName); + if (!queue) return; + const consumer = queue.consumers.find(v => v.con === connection); + if (!consumer) return; + const messageIdx = consumer.handling.findIndex(v => v.id === id); + if (messageIdx === -1) return; + const message = consumer.handling[messageIdx]; + consumer.handling.splice(messageIdx, 1); + + if (answer.error) { + message.state = QueueMessageState.error; + } else if (answer.success) { + message.state = QueueMessageState.done; + } else if (answer.delay) { + message.delay = Date.now() + answer.delay; + } + + //todo: handle delays and retries + } + public set(id: string, data: Uint8Array) { this.setStore.set(id, data); } diff --git a/packages/broker/src/model.ts b/packages/broker/src/model.ts index 8be0443b6..ecd0b2bcd 100644 --- a/packages/broker/src/model.ts +++ b/packages/broker/src/model.ts @@ -8,7 +8,7 @@ * You should have received a copy of the MIT License along with this program. */ -export enum BrokerType { +export const enum BrokerType { //the first 100 are reserved Ack, Error, @@ -34,6 +34,12 @@ export enum BrokerType { ResponseLockFailed, ResponseIsLock, + QueuePublish, + QueueSubscribe, + QueueUnsubscribe, + QueueResponseHandleMessage, + QueueMessageHandled, + PublishEntityFields, //internal set of fields will be set. if changed, it will be broadcasted to each connected client UnsubscribeEntityFields, //when fields set changes, the new set will be broadcasted to each connected client AllEntityFields, //clients requests all available entity-fields @@ -67,20 +73,51 @@ export interface brokerGet { n: string; } -export interface brokerPublish { +export interface brokerBusPublish { c: string, v: Uint8Array, } -export interface brokerSubscribe { +export interface brokerBusSubscribe { c: string; } -export interface brokerResponseSubscribeMessage { +export interface brokerBusResponseHandleMessage { c: string, v: Uint8Array, } +export interface BrokerQueuePublish { + c: string; + delay?: number; + priority?: number; + v: Uint8Array; +} + +export interface BrokerQueueSubscribe { + c: string; + maxParallel: number; +} + +export interface BrokerQueueUnsubscribe { + c: string; +} + +export interface BrokerQueueResponseHandleMessage { + c: string; + id: number; + v: Uint8Array; +} + +// consumer handled the message and sends back the result +export interface BrokerQueueMessageHandled { + c: string; + id: number; + success: boolean; + error?: string; + delay?: number; +} + export interface brokerLockId { id: string; } @@ -99,3 +136,31 @@ export interface brokerEntityFields { name: string, fields: string[], } + +export enum SnapshotEntryType { + queue, +} + +export type SnapshotEntry = { + type: SnapshotEntryType.queue, + currentId: number; + name: string; + amount: number; +} + +export enum QueueMessageState { + pending, + inFlight, + done, + error, +} + +export interface QueueMessage { + id: number; + state: QueueMessageState; + delay: number; + priority?: number; + lastError?: string; + tries: number; + v: Uint8Array; +} diff --git a/packages/broker/src/snaptshot.ts b/packages/broker/src/snaptshot.ts new file mode 100644 index 000000000..b3b1af79e --- /dev/null +++ b/packages/broker/src/snaptshot.ts @@ -0,0 +1,67 @@ +import { getBSONDeserializer, getBSONSerializer } from '@deepkit/bson'; +import { BrokerState, Queue } from './kernel.js'; +import { QueueMessage, SnapshotEntry, SnapshotEntryType } from './model.js'; + +export function snapshotState(state: BrokerState, writer: (v: Uint8Array) => void) { + + const serializeEntry = getBSONSerializer(); + const serializeMessage = getBSONSerializer(); + + for (const queue of state.queues.values()) { + const q: SnapshotEntry = { + currentId: queue.currentId, + type: SnapshotEntryType.queue, + name: queue.name, + amount: queue.messages.length, + }; + + const bson = serializeEntry(q); + writer(bson); + + for (const message of queue.messages) { + writer(serializeMessage(message)); + } + } +} + +function ensureDocumentIsInBuffer(buffer: Buffer, reader: (size: number) => Uint8Array): [Buffer, number] { + let documentSize: number = buffer.byteLength >= 4 ? buffer.readUInt32LE(0) : 0; + while (documentSize === 0 || buffer.byteLength < documentSize) { + buffer = Buffer.concat([buffer, reader(documentSize ? documentSize - buffer.byteLength : 32)]); + if (buffer.byteLength === 0) return [buffer, 0]; + if (documentSize === 0 && buffer.byteLength >= 4) { + documentSize = buffer.readUInt32LE(0); + } + } + return [buffer, documentSize]; +} + +export function restoreState(state: BrokerState, reader: (size: number) => Uint8Array) { + const deserializeEntry = getBSONDeserializer(); + const deserializeMessage = getBSONDeserializer(); + + while (true) { + let [buffer, documentSize] = ensureDocumentIsInBuffer(Buffer.alloc(0), reader); + if (!documentSize) return; + const entry = deserializeEntry(buffer.subarray(0, documentSize)); + buffer = buffer.subarray(documentSize); + + const queue: Queue = { + currentId: entry.currentId, + name: entry.name, + messages: [], + consumers: [], + }; + + state.queues.set(queue.name, queue); + + for (let i = 0; i < entry.amount; i++) { + [buffer, documentSize] = ensureDocumentIsInBuffer(buffer, reader); + if (documentSize === 0) return; + const message = deserializeMessage(buffer); + buffer = buffer.subarray(documentSize); + queue.messages.push(message); + } + } + +} diff --git a/packages/broker/tests/broker.spec.ts b/packages/broker/tests/broker.spec.ts index d2cf8dc12..17b1aeab5 100644 --- a/packages/broker/tests/broker.spec.ts +++ b/packages/broker/tests/broker.spec.ts @@ -1,8 +1,8 @@ import { expect, jest, test } from '@jest/globals'; -import { Broker, BrokerAdapter, BrokerBusChannel, BrokerCacheKey } from '../src/broker.js'; +import { Broker, BrokerAdapter, BrokerBusChannel, BrokerCacheKey, BrokerQueueChannel } from '../src/broker.js'; import { BrokerMemoryAdapter } from '../src/adapters/memory-adapter.js'; -jest.setTimeout(30000); +jest.setTimeout(10000); export let adapterFactory: () => Promise = async () => new BrokerMemoryAdapter(); @@ -10,7 +10,7 @@ export function setAdapterFactory(factory: () => Promise) { adapterFactory = factory; } -test('cache api', async () => { +test('cache', async () => { const broker = new Broker(await adapterFactory()); type User = { id: number, username: string }; @@ -42,7 +42,7 @@ test('cache api', async () => { } }); -test('bus api', async () => { +test('bus', async () => { const broker = new Broker(await adapterFactory()); type Events = { type: 'user-created', id: number } | { type: 'user-deleted', id: number }; @@ -57,7 +57,7 @@ test('bus api', async () => { await channel.publish({ type: 'user-created', id: 2 }); }); -test('lock api', async () => { +test('lock', async () => { const broker = new Broker(await adapterFactory()); const lock1 = broker.lock('my-lock', { ttl: 1000 }); @@ -72,3 +72,23 @@ test('lock api', async () => { await lock1.release(); expect(lock1.acquired).toBe(false); }); + +test('queue', async () => { + const broker = new Broker(await adapterFactory()); + + type User = { id: number, username: string }; + type QueueA = BrokerQueueChannel; + + const queue = broker.queue(); + + const p = new Promise(async (resolve) => { + await queue.consume(async (message) => { + console.log(message); + resolve(message.data); + }); + }); + + await queue.produce({ id: 3, username: 'peter' }); + + expect(await p).toEqual({ id: 3, username: 'peter' }); +}); diff --git a/packages/broker/tests/deepkit-broker.spec.ts b/packages/broker/tests/deepkit-broker.spec.ts new file mode 100644 index 000000000..e53753e34 --- /dev/null +++ b/packages/broker/tests/deepkit-broker.spec.ts @@ -0,0 +1,21 @@ +import { test } from '@jest/globals'; +import { setAdapterFactory } from './broker.spec.js'; +import { BrokerDeepkitAdapter } from '../src/adapters/deepkit-adapter.js'; +import { BrokerKernel } from '../src/kernel.js'; +import { RpcDirectClientAdapter } from '@deepkit/rpc'; + + +setAdapterFactory(async () => { + const kernel = new BrokerKernel(); + const client = new RpcDirectClientAdapter(kernel); + return new BrokerDeepkitAdapter(client); +}); + +// since we import broker.spec.js, all its tests are scheduled to run +// we define 'basic' here too, so we can easily run just this test. +// also necessary to have at least once test in this file, so that WebStorm +// detects the file as a test file. +test('cache', () => {}); +test('bus', () => {}); +test('lock', () => {}); +test('queue', () => {}); diff --git a/packages/broker/tests/snapshot.spec.ts b/packages/broker/tests/snapshot.spec.ts new file mode 100644 index 000000000..92d06b016 --- /dev/null +++ b/packages/broker/tests/snapshot.spec.ts @@ -0,0 +1,45 @@ +import { expect, test } from '@jest/globals'; +import { BrokerState } from '../src/kernel.js'; +import { restoreState, snapshotState } from '../src/snaptshot.js'; +import { QueueMessageState } from '../src/model.js'; + +test('snapshot', () => { + const state = new BrokerState(); + + state.queues.set('test', { + currentId: 2, + messages: [ + { id: 1, tries: 1, state: QueueMessageState.inFlight, v: new Uint8Array([1, 2, 3]), lastError: 'error', delay: 0 }, + { id: 2, tries: 0, state: QueueMessageState.pending, v: new Uint8Array([3, 3, 3]), delay: 0 }, + ], + consumers: [], + name: 'test', + }); + + state.queues.set('test2', { + currentId: 2, + messages: [ + { id: 1, tries: 0, state: QueueMessageState.pending, v: new Uint8Array([5, 5, 5]), delay: 0 }, + { id: 2, tries: 0, state: QueueMessageState.pending, v: new Uint8Array([4, 4, 4]), delay: 0 }, + ], + consumers: [], + name: 'test2', + }); + + const chunks: Uint8Array[] = []; + snapshotState(state, (v) => { + chunks.push(v); + }); + + const buffer = Buffer.concat(chunks); + + const newState = new BrokerState(); + let offset = 0; + restoreState(newState, (size: number) => { + const res = buffer.subarray(offset, offset + size); + offset += res.byteLength; + return res; + }); + + expect(newState.queues).toEqual(state.queues); +});