From 79ffca314d27720cdac07f5a0a03a9619ac49e94 Mon Sep 17 00:00:00 2001 From: liuyi Date: Thu, 14 Mar 2024 17:34:32 +0000 Subject: [PATCH] feat(server): allow prefetch doc stats before sync (#6115) --- .../backend/server/src/core/doc/manager.ts | 97 +++++++++++++++---- .../src/core/sync/events/events.gateway.ts | 36 +++++-- 2 files changed, 109 insertions(+), 24 deletions(-) diff --git a/packages/backend/server/src/core/doc/manager.ts b/packages/backend/server/src/core/doc/manager.ts index 3d0035e1ed915..faa50ecc7dba8 100644 --- a/packages/backend/server/src/core/doc/manager.ts +++ b/packages/backend/server/src/core/doc/manager.ts @@ -229,12 +229,12 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { update: Buffer, retryTimes = 10 ) { - await new Promise((resolve, reject) => { + const timestamp = await new Promise((resolve, reject) => { defer(async () => { const seq = await this.getUpdateSeq(workspaceId, guid); - await this.db.update.create({ + const { createdAt } = await this.db.update.create({ select: { - seq: true, + createdAt: true, }, data: { workspaceId, @@ -243,23 +243,27 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { blob: update, }, }); + + return createdAt.getTime(); }) .pipe(retry(retryTimes)) // retry until seq num not conflict .subscribe({ - next: () => { + next: timestamp => { this.logger.debug( `pushed 1 update for ${guid} in workspace ${workspaceId}` ); - resolve(); + resolve(timestamp); }, error: e => { this.logger.error('Failed to push updates', e); reject(new Error('Failed to push update')); }, }); - }).then(() => { - return this.updateCachedUpdatesCount(workspaceId, guid, 1); }); + + await this.updateCachedUpdatesCount(workspaceId, guid, 1); + + return timestamp; } async batchPush( @@ -268,24 +272,34 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { updates: Buffer[], retryTimes = 10 ) { + const lastSeq = await this.getUpdateSeq(workspaceId, guid, updates.length); + const now = Date.now(); + let timestamp = now; await new Promise((resolve, reject) => { defer(async () => { - const seq = await this.getUpdateSeq(workspaceId, guid, updates.length); let turn = 0; const batchCount = 10; for (const batch of chunk(updates, batchCount)) { await this.db.update.createMany({ - data: batch.map((update, i) => ({ - workspaceId, - id: guid, + data: batch.map((update, i) => { + const subSeq = turn * batchCount + i + 1; // `seq` is the last seq num of the batch // example for 11 batched updates, start from seq num 20 // seq for first update in the batch should be: - // 31 - 11 + 0 * 10 + 0 + 1 = 21 - // ^ last seq num ^ updates.length ^ turn ^ batchCount ^i - seq: seq - updates.length + turn * batchCount + i + 1, - blob: update, - })), + // 31 - 11 + subSeq(0 * 10 + 0 + 1) = 21 + // ^ last seq num ^ updates.length ^ turn ^ batchCount ^i + const seq = lastSeq - updates.length + subSeq; + const createdAt = now + subSeq; + timestamp = Math.max(timestamp, createdAt); + + return { + workspaceId, + id: guid, + blob: update, + seq, + createdAt: new Date(createdAt), // make sure the updates can be ordered by create time + }; + }), }); turn++; } @@ -303,9 +317,56 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { reject(new Error('Failed to push update')); }, }); - }).then(() => { - return this.updateCachedUpdatesCount(workspaceId, guid, updates.length); }); + await this.updateCachedUpdatesCount(workspaceId, guid, updates.length); + + return timestamp; + } + + /** + * Get latest timestamp of all docs in the workspace. + */ + @CallTimer('doc', 'get_stats') + async getStats(workspaceId: string, after: number | undefined = 0) { + const snapshots = await this.db.snapshot.findMany({ + where: { + workspaceId, + updatedAt: { + gt: new Date(after), + }, + }, + select: { + id: true, + updatedAt: true, + }, + }); + + const updates = await this.db.update.groupBy({ + where: { + workspaceId, + createdAt: { + gt: new Date(after), + }, + }, + by: ['id'], + _max: { + createdAt: true, + }, + }); + + const result: Record = {}; + + snapshots.forEach(s => { + result[s.id] = s.updatedAt.getTime(); + }); + + updates.forEach(u => { + if (u._max.createdAt) { + result[u.id] = u._max.createdAt.getTime(); + } + }); + + return result; } /** diff --git a/packages/backend/server/src/core/sync/events/events.gateway.ts b/packages/backend/server/src/core/sync/events/events.gateway.ts index ca7941faeb4f4..a637675f10425 100644 --- a/packages/backend/server/src/core/sync/events/events.gateway.ts +++ b/packages/backend/server/src/core/sync/events/events.gateway.ts @@ -86,7 +86,7 @@ type EventResponse = }); @WebSocketGateway({ - cors: process.env.NODE_ENV !== 'production', + cors: !AFFiNE.node.prod, transports: ['websocket'], // see: https://socket.io/docs/v4/server-options/#maxhttpbuffersize maxHttpBufferSize: 1e8, // 100 MB @@ -251,6 +251,25 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect { } } + @SubscribeMessage('client-pre-sync') + async loadDocStats( + @ConnectedSocket() client: Socket, + @MessageBody() + { workspaceId, timestamp }: { workspaceId: string; timestamp?: number } + ): Promise>> { + if (!client.rooms.has(`${workspaceId}:sync`)) { + return { + error: new NotInWorkspaceError(workspaceId), + }; + } + + const stats = await this.docManager.getStats(workspaceId, timestamp); + + return { + data: stats, + }; + } + @SubscribeMessage('client-update-v2') async handleClientUpdateV2( @MessageBody() @@ -264,7 +283,7 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect { updates: string[]; }, @ConnectedSocket() client: Socket - ): Promise> { + ): Promise> { if (!client.rooms.has(`${workspaceId}:sync`)) { return { error: new NotInWorkspaceError(workspaceId), @@ -272,16 +291,21 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect { } const docId = new DocID(guid, workspaceId); + const buffers = updates.map(update => Buffer.from(update, 'base64')); + const timestamp = await this.docManager.batchPush( + docId.workspace, + docId.guid, + buffers + ); + client .to(`${docId.workspace}:sync`) - .emit('server-updates', { workspaceId, guid, updates }); - - const buffers = updates.map(update => Buffer.from(update, 'base64')); + .emit('server-updates', { workspaceId, guid, updates, timestamp }); - await this.docManager.batchPush(docId.workspace, docId.guid, buffers); return { data: { accepted: true, + timestamp, }, }; }