Skip to content

Commit

Permalink
feat(server): allow prefetch doc stats before sync (#6115)
Browse files Browse the repository at this point in the history
  • Loading branch information
forehalo committed Mar 14, 2024
1 parent 7fdb1f2 commit 79ffca3
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 24 deletions.
97 changes: 79 additions & 18 deletions packages/backend/server/src/core/doc/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -229,12 +229,12 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
update: Buffer,
retryTimes = 10
) {
await new Promise<void>((resolve, reject) => {
const timestamp = await new Promise<number>((resolve, reject) => {
defer(async () => {
const seq = await this.getUpdateSeq(workspaceId, guid);
await this.db.update.create({
const { createdAt } = await this.db.update.create({
select: {
seq: true,
createdAt: true,
},
data: {
workspaceId,
Expand All @@ -243,23 +243,27 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
blob: update,
},
});

return createdAt.getTime();
})
.pipe(retry(retryTimes)) // retry until seq num not conflict
.subscribe({
next: () => {
next: timestamp => {
this.logger.debug(
`pushed 1 update for ${guid} in workspace ${workspaceId}`
);
resolve();
resolve(timestamp);
},
error: e => {
this.logger.error('Failed to push updates', e);
reject(new Error('Failed to push update'));
},
});
}).then(() => {
return this.updateCachedUpdatesCount(workspaceId, guid, 1);
});

await this.updateCachedUpdatesCount(workspaceId, guid, 1);

return timestamp;
}

async batchPush(
Expand All @@ -268,24 +272,34 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
updates: Buffer[],
retryTimes = 10
) {
const lastSeq = await this.getUpdateSeq(workspaceId, guid, updates.length);
const now = Date.now();
let timestamp = now;
await new Promise<void>((resolve, reject) => {
defer(async () => {
const seq = await this.getUpdateSeq(workspaceId, guid, updates.length);
let turn = 0;
const batchCount = 10;
for (const batch of chunk(updates, batchCount)) {
await this.db.update.createMany({
data: batch.map((update, i) => ({
workspaceId,
id: guid,
data: batch.map((update, i) => {
const subSeq = turn * batchCount + i + 1;
// `seq` is the last seq num of the batch
// example for 11 batched updates, start from seq num 20
// seq for first update in the batch should be:
// 31 - 11 + 0 * 10 + 0 + 1 = 21
// ^ last seq num ^ updates.length ^ turn ^ batchCount ^i
seq: seq - updates.length + turn * batchCount + i + 1,
blob: update,
})),
// 31 - 11 + subSeq(0 * 10 + 0 + 1) = 21
// ^ last seq num ^ updates.length ^ turn ^ batchCount ^i
const seq = lastSeq - updates.length + subSeq;
const createdAt = now + subSeq;
timestamp = Math.max(timestamp, createdAt);

return {
workspaceId,
id: guid,
blob: update,
seq,
createdAt: new Date(createdAt), // make sure the updates can be ordered by create time
};
}),
});
turn++;
}
Expand All @@ -303,9 +317,56 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
reject(new Error('Failed to push update'));
},
});
}).then(() => {
return this.updateCachedUpdatesCount(workspaceId, guid, updates.length);
});
await this.updateCachedUpdatesCount(workspaceId, guid, updates.length);

return timestamp;
}

/**
* Get latest timestamp of all docs in the workspace.
*/
@CallTimer('doc', 'get_stats')
async getStats(workspaceId: string, after: number | undefined = 0) {
const snapshots = await this.db.snapshot.findMany({
where: {
workspaceId,
updatedAt: {
gt: new Date(after),
},
},
select: {
id: true,
updatedAt: true,
},
});

const updates = await this.db.update.groupBy({
where: {
workspaceId,
createdAt: {
gt: new Date(after),
},
},
by: ['id'],
_max: {
createdAt: true,
},
});

const result: Record<string, number> = {};

snapshots.forEach(s => {
result[s.id] = s.updatedAt.getTime();
});

updates.forEach(u => {
if (u._max.createdAt) {
result[u.id] = u._max.createdAt.getTime();
}
});

return result;
}

/**
Expand Down
36 changes: 30 additions & 6 deletions packages/backend/server/src/core/sync/events/events.gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ type EventResponse<Data = any> =
});

@WebSocketGateway({
cors: process.env.NODE_ENV !== 'production',
cors: !AFFiNE.node.prod,
transports: ['websocket'],
// see: https://socket.io/docs/v4/server-options/#maxhttpbuffersize
maxHttpBufferSize: 1e8, // 100 MB
Expand Down Expand Up @@ -251,6 +251,25 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
}
}

@SubscribeMessage('client-pre-sync')
async loadDocStats(
@ConnectedSocket() client: Socket,
@MessageBody()
{ workspaceId, timestamp }: { workspaceId: string; timestamp?: number }
): Promise<EventResponse<Record<string, number>>> {
if (!client.rooms.has(`${workspaceId}:sync`)) {
return {
error: new NotInWorkspaceError(workspaceId),
};
}

const stats = await this.docManager.getStats(workspaceId, timestamp);

return {
data: stats,
};
}

@SubscribeMessage('client-update-v2')
async handleClientUpdateV2(
@MessageBody()
Expand All @@ -264,24 +283,29 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
updates: string[];
},
@ConnectedSocket() client: Socket
): Promise<EventResponse<{ accepted: true }>> {
): Promise<EventResponse<{ accepted: true; timestamp?: number }>> {
if (!client.rooms.has(`${workspaceId}:sync`)) {
return {
error: new NotInWorkspaceError(workspaceId),
};
}

const docId = new DocID(guid, workspaceId);
const buffers = updates.map(update => Buffer.from(update, 'base64'));
const timestamp = await this.docManager.batchPush(
docId.workspace,
docId.guid,
buffers
);

client
.to(`${docId.workspace}:sync`)
.emit('server-updates', { workspaceId, guid, updates });

const buffers = updates.map(update => Buffer.from(update, 'base64'));
.emit('server-updates', { workspaceId, guid, updates, timestamp });

await this.docManager.batchPush(docId.workspace, docId.guid, buffers);
return {
data: {
accepted: true,
timestamp,
},
};
}
Expand Down

0 comments on commit 79ffca3

Please sign in to comment.