Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): allow prefetch doc stats before sync #6115

Merged
merged 1 commit into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 79 additions & 18 deletions packages/backend/server/src/core/doc/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -229,12 +229,12 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
update: Buffer,
retryTimes = 10
) {
await new Promise<void>((resolve, reject) => {
const timestamp = await new Promise<number>((resolve, reject) => {
defer(async () => {
const seq = await this.getUpdateSeq(workspaceId, guid);
await this.db.update.create({
const { createdAt } = await this.db.update.create({
select: {
seq: true,
createdAt: true,
},
data: {
workspaceId,
Expand All @@ -243,23 +243,27 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
blob: update,
},
});

return createdAt.getTime();
})
.pipe(retry(retryTimes)) // retry until seq num not conflict
.subscribe({
next: () => {
next: timestamp => {
this.logger.debug(
`pushed 1 update for ${guid} in workspace ${workspaceId}`
);
resolve();
resolve(timestamp);
},
error: e => {
this.logger.error('Failed to push updates', e);
reject(new Error('Failed to push update'));
},
});
}).then(() => {
return this.updateCachedUpdatesCount(workspaceId, guid, 1);
});

await this.updateCachedUpdatesCount(workspaceId, guid, 1);

return timestamp;
}

async batchPush(
Expand All @@ -268,24 +272,34 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
updates: Buffer[],
retryTimes = 10
) {
const lastSeq = await this.getUpdateSeq(workspaceId, guid, updates.length);
const now = Date.now();
let timestamp = now;
await new Promise<void>((resolve, reject) => {
defer(async () => {
const seq = await this.getUpdateSeq(workspaceId, guid, updates.length);
let turn = 0;
const batchCount = 10;
for (const batch of chunk(updates, batchCount)) {
await this.db.update.createMany({
data: batch.map((update, i) => ({
workspaceId,
id: guid,
data: batch.map((update, i) => {
const subSeq = turn * batchCount + i + 1;
// `seq` is the last seq num of the batch
// example for 11 batched updates, start from seq num 20
// seq for first update in the batch should be:
// 31 - 11 + 0 * 10 + 0 + 1 = 21
// ^ last seq num ^ updates.length ^ turn ^ batchCount ^i
seq: seq - updates.length + turn * batchCount + i + 1,
blob: update,
})),
// 31 - 11 + subSeq(0 * 10 + 0 + 1) = 21
// ^ last seq num ^ updates.length ^ turn ^ batchCount ^i
const seq = lastSeq - updates.length + subSeq;
const createdAt = now + subSeq;
timestamp = Math.max(timestamp, createdAt);

return {
workspaceId,
id: guid,
blob: update,
seq,
createdAt: new Date(createdAt), // make sure the updates can be ordered by create time
};
}),
});
turn++;
}
Expand All @@ -303,9 +317,56 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
reject(new Error('Failed to push update'));
},
});
}).then(() => {
return this.updateCachedUpdatesCount(workspaceId, guid, updates.length);
});
await this.updateCachedUpdatesCount(workspaceId, guid, updates.length);

return timestamp;
}

/**
* Get latest timestamp of all docs in the workspace.
*/
@CallTimer('doc', 'get_stats')
async getStats(workspaceId: string, after: number | undefined = 0) {
const snapshots = await this.db.snapshot.findMany({
where: {
workspaceId,
updatedAt: {
gt: new Date(after),
},
darkskygit marked this conversation as resolved.
Show resolved Hide resolved
},
select: {
id: true,
updatedAt: true,
},
});

const updates = await this.db.update.groupBy({
where: {
workspaceId,
createdAt: {
gt: new Date(after),
},
},
by: ['id'],
_max: {
createdAt: true,
},
});

const result: Record<string, number> = {};

snapshots.forEach(s => {
result[s.id] = s.updatedAt.getTime();
});

updates.forEach(u => {
if (u._max.createdAt) {
result[u.id] = u._max.createdAt.getTime();
}
});

return result;
}

/**
Expand Down
36 changes: 30 additions & 6 deletions packages/backend/server/src/core/sync/events/events.gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ type EventResponse<Data = any> =
});

@WebSocketGateway({
cors: process.env.NODE_ENV !== 'production',
cors: !AFFiNE.node.prod,
transports: ['websocket'],
// see: https://socket.io/docs/v4/server-options/#maxhttpbuffersize
maxHttpBufferSize: 1e8, // 100 MB
Expand Down Expand Up @@ -251,6 +251,25 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
}
}

@SubscribeMessage('client-pre-sync')
async loadDocStats(
@ConnectedSocket() client: Socket,
@MessageBody()
{ workspaceId, timestamp }: { workspaceId: string; timestamp?: number }
): Promise<EventResponse<Record<string, number>>> {
if (!client.rooms.has(`${workspaceId}:sync`)) {
return {
error: new NotInWorkspaceError(workspaceId),
};
}

const stats = await this.docManager.getStats(workspaceId, timestamp);

return {
data: stats,
};
}

@SubscribeMessage('client-update-v2')
async handleClientUpdateV2(
@MessageBody()
Expand All @@ -264,24 +283,29 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
updates: string[];
},
@ConnectedSocket() client: Socket
): Promise<EventResponse<{ accepted: true }>> {
): Promise<EventResponse<{ accepted: true; timestamp?: number }>> {
if (!client.rooms.has(`${workspaceId}:sync`)) {
return {
error: new NotInWorkspaceError(workspaceId),
};
}

const docId = new DocID(guid, workspaceId);
const buffers = updates.map(update => Buffer.from(update, 'base64'));
const timestamp = await this.docManager.batchPush(
docId.workspace,
docId.guid,
buffers
);

client
.to(`${docId.workspace}:sync`)
.emit('server-updates', { workspaceId, guid, updates });

const buffers = updates.map(update => Buffer.from(update, 'base64'));
.emit('server-updates', { workspaceId, guid, updates, timestamp });

await this.docManager.batchPush(docId.workspace, docId.guid, buffers);
return {
data: {
accepted: true,
timestamp,
},
};
}
Expand Down
Loading