Skip to content

Commit

Permalink
feat(server): cleanup gateway code
Browse files Browse the repository at this point in the history
  • Loading branch information
forehalo committed Mar 14, 2024
1 parent 64728d6 commit e00d339
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 163 deletions.
242 changes: 101 additions & 141 deletions packages/backend/server/src/core/sync/events/events.gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,27 +38,21 @@ export const GatewayErrorWrapper = (): MethodDecorator => {
return desc;
}

desc.value = function (...args: any[]) {
let result: any;
desc.value = async function (...args: any[]) {
try {
result = originalMethod.apply(this, args);
return await originalMethod.apply(this, args);
} catch (e) {
metrics.socketio.counter('unhandled_errors').add(1);
return {
error: new InternalError(e as Error),
};
}

if (result instanceof Promise) {
return result.catch(e => {
if (e instanceof EventError) {
return {
error: e,
};
} else {
metrics.socketio.counter('unhandled_errors').add(1);
new Logger('EventsGateway').error(e, e.stack);
new Logger('EventsGateway').error(e, (e as Error).stack);
return {
error: new InternalError(e),
error: new InternalError(e as Error),
};
});
} else {
return result;
}
}
};

Expand All @@ -85,6 +79,14 @@ type EventResponse<Data = any> =
data: Data;
});

function Sync(workspaceId: string): `${string}:sync` {
return `${workspaceId}:sync`;
}

function Awareness(workspaceId: string): `${string}:awareness` {
return `${workspaceId}:awareness`;
}

@WebSocketGateway({
cors: !AFFiNE.node.prod,
transports: ['websocket'],
Expand Down Expand Up @@ -113,7 +115,7 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
metrics.socketio.gauge('realtime_connections').record(this.connectionCount);
}

checkVersion(client: Socket, version?: string) {
assertVersion(client: Socket, version?: string) {
if (
// @todo(@darkskygit): remove this flag after 0.12 goes stable
AFFiNE.featureFlags.syncClientVersionCheck &&
Expand All @@ -126,14 +128,48 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
version ? ` ${version}` : ''
} is outdated, please update to ${AFFiNE.version}`,
});
return {
error: new EventError(
EventErrorCode.VERSION_REJECTED,
`Client version ${version} is outdated, please update to ${AFFiNE.version}`
),
};

throw new EventError(
EventErrorCode.VERSION_REJECTED,
`Client version ${version} is outdated, please update to ${AFFiNE.version}`
);
}
}

async joinWorkspace(
client: Socket,
room: `${string}:${'sync' | 'awareness'}`
) {
await client.join(room);
}

async leaveWorkspace(
client: Socket,
room: `${string}:${'sync' | 'awareness'}`
) {
await client.leave(room);
}

assertInWorkspace(client: Socket, room: `${string}:${'sync' | 'awareness'}`) {
if (!client.rooms.has(room)) {
throw new NotInWorkspaceError(room);
}
}

async assertWorkspaceAccessible(
workspaceId: string,
userId: string,
permission: Permission = Permission.Read
) {
if (
!(await this.permissions.isWorkspaceMember(
workspaceId,
userId,
permission
))
) {
throw new AccessDeniedError(workspaceId);
}
return null;
}

@Auth()
Expand All @@ -144,29 +180,19 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
@MessageBody('version') version: string | undefined,
@ConnectedSocket() client: Socket
): Promise<EventResponse<{ clientId: string }>> {
const versionError = this.checkVersion(client, version);
if (versionError) {
return versionError;
}

const canWrite = await this.permissions.tryCheckWorkspace(
this.assertVersion(client, version);
await this.assertWorkspaceAccessible(
workspaceId,
user.id,
Permission.Write
);

if (canWrite) {
await client.join(`${workspaceId}:sync`);
return {
data: {
clientId: client.id,
},
};
} else {
return {
error: new AccessDeniedError(workspaceId),
};
}
await this.joinWorkspace(client, Sync(workspaceId));
return {
data: {
clientId: client.id,
},
};
}

@Auth()
Expand All @@ -177,47 +203,18 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
@MessageBody('version') version: string | undefined,
@ConnectedSocket() client: Socket
): Promise<EventResponse<{ clientId: string }>> {
const versionError = this.checkVersion(client, version);
if (versionError) {
return versionError;
}

const canWrite = await this.permissions.tryCheckWorkspace(
this.assertVersion(client, version);
await this.assertWorkspaceAccessible(
workspaceId,
user.id,
Permission.Write
);

if (canWrite) {
await client.join(`${workspaceId}:awareness`);
return {
data: {
clientId: client.id,
},
};
} else {
return {
error: new AccessDeniedError(workspaceId),
};
}
}

/**
* @deprecated use `client-handshake-sync` and `client-handshake-awareness` instead
*/
@Auth()
@SubscribeMessage('client-handshake')
async handleClientHandShake(
@MessageBody() workspaceId: string,
@ConnectedSocket() client: Socket
): Promise<EventResponse<{ clientId: string }>> {
const versionError = this.checkVersion(client);
if (versionError) {
return versionError;
}
// should unreachable
await this.joinWorkspace(client, Awareness(workspaceId));
return {
error: new AccessDeniedError(workspaceId),
data: {
clientId: client.id,
},
};
}

Expand All @@ -226,29 +223,19 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
@MessageBody() workspaceId: string,
@ConnectedSocket() client: Socket
): Promise<EventResponse> {
if (client.rooms.has(`${workspaceId}:sync`)) {
await client.leave(`${workspaceId}:sync`);
return {};
} else {
return {
error: new NotInWorkspaceError(workspaceId),
};
}
this.assertInWorkspace(client, Sync(workspaceId));
await this.leaveWorkspace(client, Sync(workspaceId));
return {};
}

@SubscribeMessage('client-leave-awareness')
async handleLeaveAwareness(
@MessageBody() workspaceId: string,
@ConnectedSocket() client: Socket
): Promise<EventResponse> {
if (client.rooms.has(`${workspaceId}:awareness`)) {
await client.leave(`${workspaceId}:awareness`);
return {};
} else {
return {
error: new NotInWorkspaceError(workspaceId),
};
}
this.assertInWorkspace(client, Awareness(workspaceId));
await this.leaveWorkspace(client, Awareness(workspaceId));
return {};
}

@SubscribeMessage('client-pre-sync')
Expand All @@ -257,11 +244,7 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
@MessageBody()
{ workspaceId, timestamp }: { workspaceId: string; timestamp?: number }
): Promise<EventResponse<Record<string, number>>> {
if (!client.rooms.has(`${workspaceId}:sync`)) {
return {
error: new NotInWorkspaceError(workspaceId),
};
}
this.assertInWorkspace(client, Sync(workspaceId));

const stats = await this.docManager.getStats(workspaceId, timestamp);

Expand All @@ -284,11 +267,7 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
},
@ConnectedSocket() client: Socket
): Promise<EventResponse<{ accepted: true; timestamp?: number }>> {
if (!client.rooms.has(`${workspaceId}:sync`)) {
return {
error: new NotInWorkspaceError(workspaceId),
};
}
this.assertInWorkspace(client, Sync(workspaceId));

const docId = new DocID(guid, workspaceId);
const buffers = updates.map(update => Buffer.from(update, 'base64'));
Expand All @@ -299,7 +278,7 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
);

client
.to(`${docId.workspace}:sync`)
.to(Sync(workspaceId))
.emit('server-updates', { workspaceId, guid, updates, timestamp });

return {
Expand All @@ -310,11 +289,9 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
};
}

@Auth()
@SubscribeMessage('doc-load-v2')
async loadDocV2(
@ConnectedSocket() client: Socket,
@CurrentUser() user: CurrentUser,
@MessageBody()
{
workspaceId,
Expand All @@ -326,17 +303,7 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
stateVector?: string;
}
): Promise<EventResponse<{ missing: string; state?: string }>> {
if (!client.rooms.has(`${workspaceId}:sync`)) {
const canRead = await this.permissions.tryCheckWorkspace(
workspaceId,
user.id
);
if (!canRead) {
return {
error: new AccessDeniedError(workspaceId),
};
}
}
this.assertInWorkspace(client, Sync(workspaceId));

const docId = new DocID(guid, workspaceId);
const doc = await this.docManager.get(docId.workspace, docId.guid);
Expand All @@ -363,40 +330,33 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
};
}

@Auth()
@SubscribeMessage('awareness-init')
async handleInitAwareness(
@MessageBody() workspaceId: string,
@ConnectedSocket() client: Socket
): Promise<EventResponse<{ clientId: string }>> {
if (client.rooms.has(`${workspaceId}:awareness`)) {
client.to(`${workspaceId}:awareness`).emit('new-client-awareness-init');
return {
data: {
clientId: client.id,
},
};
} else {
return {
error: new NotInWorkspaceError(workspaceId),
};
}
this.assertInWorkspace(client, Awareness(workspaceId));
client.to(Awareness(workspaceId)).emit('new-client-awareness-init');
return {
data: {
clientId: client.id,
},
};
}

@SubscribeMessage('awareness-update')
async handleHelpGatheringAwareness(
@MessageBody() message: { workspaceId: string; awarenessUpdate: string },
@MessageBody()
{
workspaceId,
awarenessUpdate,
}: { workspaceId: string; awarenessUpdate: string },
@ConnectedSocket() client: Socket
): Promise<EventResponse> {
if (client.rooms.has(`${message.workspaceId}:awareness`)) {
client
.to(`${message.workspaceId}:awareness`)
.emit('server-awareness-broadcast', message);
return {};
} else {
return {
error: new NotInWorkspaceError(message.workspaceId),
};
}
this.assertInWorkspace(client, Awareness(workspaceId));
client
.to(Awareness(workspaceId))
.emit('server-awareness-broadcast', { workspaceId, awarenessUpdate });
return {};
}
}
22 changes: 22 additions & 0 deletions packages/backend/server/src/core/workspaces/permission.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,28 @@ export class PermissionService {
return this.tryCheckPage(ws, id, user);
}

/**
* Returns whether a given user is a member of a workspace and has the given or higher permission.
*/
async isWorkspaceMember(
ws: string,
user: string,
permission: Permission
): Promise<boolean> {
const count = await this.prisma.workspaceUserPermission.count({
where: {
workspaceId: ws,
userId: user,
accepted: true,
type: {
gte: permission,
},
},
});

return count !== 0;
}

async checkWorkspace(
ws: string,
user?: string,
Expand Down
Loading

0 comments on commit e00d339

Please sign in to comment.