Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: improve server stream typings #29128

Merged
merged 20 commits into from
May 18, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/meteor/app/apps/server/bridges/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ export class AppMessageBridge extends MessageBridge {
protected async typing({ scope, id, username, isTyping }: ITypingDescriptor): Promise<void> {
switch (scope) {
case 'room':
notifications.notifyRoom(id, 'typing', username, isTyping);
notifications.notifyRoom(id, 'typing', username!, isTyping);
return;
default:
throw new Error('Unrecognized typing scope provided');
Expand Down
6 changes: 3 additions & 3 deletions apps/meteor/app/authorization/server/methods/addUserToRole.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Meteor } from 'meteor/meteor';
import type { IRole, IUser, IRoom } from '@rocket.chat/core-typings';
import type { IRole, IUser } from '@rocket.chat/core-typings';
import { Roles, Users } from '@rocket.chat/models';
import { api } from '@rocket.chat/core-services';
import type { ServerMethods } from '@rocket.chat/ui-contexts';
Expand All @@ -11,12 +11,12 @@ import { apiDeprecationLogger } from '../../../lib/server/lib/deprecationWarning
declare module '@rocket.chat/ui-contexts' {
// eslint-disable-next-line @typescript-eslint/naming-convention
interface ServerMethods {
'authorization:addUserToRole'(roleId: IRole['_id'], username: IUser['username'], scope: IRoom['_id'] | undefined): Promise<boolean>;
'authorization:addUserToRole'(roleId: IRole['_id'], username: IUser['username'], scope: string | undefined): Promise<boolean>;
}
}

Meteor.methods<ServerMethods>({
async 'authorization:addUserToRole'(roleId: IRole['_id'], username: IUser['username'], scope: IRoom['_id'] | undefined) {
async 'authorization:addUserToRole'(roleId: IRole['_id'], username: IUser['username'], scope) {
const userId = Meteor.userId();

if (!userId || !(await hasPermissionAsync(userId, 'access-permissions'))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import { apiDeprecationLogger } from '../../../lib/server/lib/deprecationWarning
declare module '@rocket.chat/ui-contexts' {
// eslint-disable-next-line @typescript-eslint/naming-convention
interface ServerMethods {
'authorization:removeUserFromRole'(roleId: IRole['_id'], username: IUser['username'], scope: undefined): Promise<boolean>;
'authorization:removeUserFromRole'(roleId: IRole['_id'], username: IUser['username'], scope?: string): Promise<boolean>;
}
}

Meteor.methods<ServerMethods>({
async 'authorization:removeUserFromRole'(roleId, username, scope) {
async 'authorization:removeUserFromRole'(roleId, username, scope = 'Users') {
ggazzo marked this conversation as resolved.
Show resolved Hide resolved
const userId = Meteor.userId();

if (!userId || !(await hasPermissionAsync(userId, 'access-permissions'))) {
Expand Down Expand Up @@ -85,7 +85,7 @@ Meteor.methods<ServerMethods>({
username,
},
scope,
};
} as const;
if (settings.get('UI_DisplayRoles')) {
void api.broadcast('user.roleUpdate', event);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ Meteor.methods<ServerMethods>({

await IntegrationHistory.removeByIntegrationId(integrationId);

notifications.streamIntegrationHistory.emit(integrationId, { type: 'removed' });
notifications.streamIntegrationHistory.emit(integrationId, { type: 'removed', id: integrationId });

return true;
},
Expand Down
37 changes: 20 additions & 17 deletions apps/meteor/app/livechat/client/lib/stream/queueManager.ts
Original file line number Diff line number Diff line change
@@ -1,33 +1,24 @@
import type { ILivechatDepartment, ILivechatInquiryRecord, IOmnichannelAgent } from '@rocket.chat/core-typings';

import { APIClient } from '../../../../utils/client';
import { LivechatInquiry } from '../../collections/LivechatInquiry';
import { callWithErrorHandling } from '../../../../../client/lib/utils/callWithErrorHandling';
import { sdk } from '../../../../utils/client/lib/SDKClient';

const departments = new Set();

type ILivechatInquiryWithType = ILivechatInquiryRecord & { type?: 'added' | 'removed' | 'changed' };

const events = {
added: (inquiry: ILivechatInquiryWithType) => {
delete inquiry.type;
added: (inquiry: ILivechatInquiryRecord) => {
departments.has(inquiry.department) && LivechatInquiry.insert({ ...inquiry, alert: true, _updatedAt: new Date(inquiry._updatedAt) });
},
changed: (inquiry: ILivechatInquiryWithType) => {
changed: (inquiry: ILivechatInquiryRecord) => {
if (inquiry.status !== 'queued' || (inquiry.department && !departments.has(inquiry.department))) {
return LivechatInquiry.remove(inquiry._id);
}
delete inquiry.type;

LivechatInquiry.upsert({ _id: inquiry._id }, { ...inquiry, alert: true, _updatedAt: new Date(inquiry._updatedAt) });
},
removed: (inquiry: ILivechatInquiryWithType) => LivechatInquiry.remove(inquiry._id),
};

const updateCollection = (inquiry: ILivechatInquiryWithType) => {
if (!inquiry.type) {
return;
}
events[inquiry.type](inquiry);
removed: (inquiry: ILivechatInquiryRecord) => LivechatInquiry.remove(inquiry._id),
};

const getInquiriesFromAPI = async () => {
Expand All @@ -42,7 +33,13 @@ const removeListenerOfDepartment = (departmentId: ILivechatDepartment['_id']) =>

const appendListenerToDepartment = (departmentId: ILivechatDepartment['_id']) => {
departments.add(departmentId);
sdk.stream('livechat-inquiry-queue-observer', [`department/${departmentId}`], updateCollection);
sdk.stream('livechat-inquiry-queue-observer', [`department/${departmentId}`], (args) => {
if (!('type' in args)) {
return;
}
const { type, ...inquiry } = args;
events[args.type](inquiry);
});
return () => removeListenerOfDepartment(departmentId);
};
const addListenerForeachDepartment = (departments: ILivechatDepartment['_id'][] = []) => {
Expand All @@ -54,14 +51,20 @@ const updateInquiries = async (inquiries: ILivechatInquiryRecord[] = []) =>
inquiries.forEach((inquiry) => LivechatInquiry.upsert({ _id: inquiry._id }, { ...inquiry, _updatedAt: new Date(inquiry._updatedAt) }));

const getAgentsDepartments = async (userId: IOmnichannelAgent['_id']) => {
const { departments } = await sdk.rest.get(`/v1/livechat/agents/${userId}/departments`, { enabledDepartmentsOnly: 'true' });
const { departments } = await APIClient.get(`/v1/livechat/agents/${userId}/departments`, { enabledDepartmentsOnly: 'true' });
ggazzo marked this conversation as resolved.
Show resolved Hide resolved
ggazzo marked this conversation as resolved.
Show resolved Hide resolved
return departments;
};

const removeGlobalListener = () => sdk.stop('livechat-inquiry-queue-observer', 'public');

const addGlobalListener = () => {
sdk.stream('livechat-inquiry-queue-observer', ['public'], updateCollection);
sdk.stream('livechat-inquiry-queue-observer', ['public'], (args) => {
if (!('type' in args)) {
return;
}
const { type, ...inquiry } = args;
events[args.type](inquiry);
});
return removeGlobalListener;
};

Expand Down
2 changes: 1 addition & 1 deletion apps/meteor/app/notifications/server/lib/Notifications.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { Streamer } from '../../../../server/modules/streamer/streamer.module';

import './Presence';

class Stream extends Streamer {
class Stream extends Streamer<'local'> {
registerPublication(name: string, fn: (eventName: string, options: boolean | { useCollection?: boolean; args?: any }) => void): void {
Meteor.publish(name, function (eventName, options) {
return fn.call(this, eventName, options);
Expand Down
10 changes: 5 additions & 5 deletions apps/meteor/app/notifications/server/lib/Presence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ const e = new Emitter<{
const clients = new WeakMap<Connection, UserPresence>();

class UserPresence {
private readonly streamer: IStreamer;
private readonly streamer: IStreamer<'user-presence'>;

private readonly publication: IPublication;

private readonly listeners: Set<string>;

constructor(publication: IPublication, streamer: IStreamer) {
constructor(publication: IPublication, streamer: IStreamer<'user-presence'>) {
this.listeners = new Set();
this.publication = publication;
this.streamer = streamer;
Expand Down Expand Up @@ -54,7 +54,7 @@ class UserPresence {
clients.delete(this.publication.connection);
}

static getClient(publication: IPublication, streamer: IStreamer): [UserPresence, boolean] {
static getClient(publication: IPublication, streamer: IStreamer<'user-presence'>): [UserPresence, boolean] {
const { connection } = publication;
const stored = clients.get(connection);

Expand All @@ -70,8 +70,8 @@ class UserPresence {

export class StreamPresence {
// eslint-disable-next-line @typescript-eslint/naming-convention
static getInstance(Streamer: IStreamerConstructor, name = 'user-presence'): IStreamer {
return new (class StreamPresence extends Streamer {
static getInstance(Streamer: IStreamerConstructor, name = 'user-presence'): IStreamer<'user-presence'> {
return new (class StreamPresence extends Streamer<'user-presence'> {
async _publish(
publication: IPublication,
_eventName: string,
Expand Down
4 changes: 2 additions & 2 deletions apps/meteor/app/search/server/search.internalService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ class Search extends ServiceClassInternal {
constructor() {
super();

this.onEvent('watch.users', async ({ clientAction, data, id }) => {
this.onEvent('watch.users', async ({ clientAction, id, ...rest }) => {
if (clientAction === 'removed') {
searchEventService.promoteEvent('user.delete', id, undefined);
return;
}

const user = data ?? (await Users.findOneById(id));
const user = ('data' in rest && rest.data) || (await Users.findOneById(id));
searchEventService.promoteEvent('user.save', id, user);
});

Expand Down
2 changes: 1 addition & 1 deletion apps/meteor/client/lib/userData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ export const synchronizeUserData = async (uid: IUser['_id']): Promise<RawUserDat
case 'inserted':
// eslint-disable-next-line @typescript-eslint/no-unused-vars
const { type, id, ...user } = data;
Users.insert(user as IUser);
Users.insert(user as unknown as IUser);
break;

case 'updated':
Expand Down
8 changes: 4 additions & 4 deletions apps/meteor/client/startup/incomingMessages.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { IMessage, ISubscription } from '@rocket.chat/core-typings';
import type { IMessage } from '@rocket.chat/core-typings';
import { Meteor } from 'meteor/meteor';

import { ChatMessage } from '../../app/models/client';
Expand All @@ -20,16 +20,16 @@ Meteor.startup(() => {
});

CachedCollectionManager.onLogin(() => {
Notifications.onUser('subscriptions-changed', (_action: unknown, sub: ISubscription) => {
Notifications.onUser('subscriptions-changed', (_action, sub) => {
ChatMessage.update(
{
rid: sub.rid,
...(sub?.ignored ? { 'u._id': { $nin: sub.ignored } } : { ignored: { $exists: true } }),
...('ignored' in sub && sub.ignored ? { 'u._id': { $nin: sub.ignored } } : { ignored: { $exists: true } }),
},
{ $unset: { ignored: true } },
{ multi: true },
);
if (sub?.ignored) {
if ('ignored' in sub && sub.ignored) {
ChatMessage.update(
{ 'rid': sub.rid, 't': { $ne: 'command' }, 'u._id': { $in: sub.ignored } },
{ $set: { ignored: true } },
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { ISubscription, IUser } from '@rocket.chat/core-typings';
import type { AtLeast, ISubscription, IUser } from '@rocket.chat/core-typings';
import { FlowRouter } from 'meteor/kadira:flow-router';
import { Meteor } from 'meteor/meteor';
import { Tracker } from 'meteor/tracker';
Expand All @@ -12,7 +12,7 @@ import { RoomManager } from '../../lib/RoomManager';
import { fireGlobalEvent } from '../../lib/utils/fireGlobalEvent';
import { isLayoutEmbedded } from '../../lib/utils/isLayoutEmbedded';

const notifyNewRoom = async (sub: ISubscription): Promise<void> => {
const notifyNewRoom = async (sub: AtLeast<ISubscription, 'rid'>): Promise<void> => {
const user = Meteor.user() as IUser | null;
if (!user || user.status === 'busy') {
return;
Expand Down Expand Up @@ -76,7 +76,10 @@ Meteor.startup(() => {
void notifyNewRoom(sub);
});

Notifications.onUser('subscriptions-changed', (_, sub) => {
Notifications.onUser('subscriptions-changed', (action, sub) => {
if (action === 'removed') {
return;
}
void notifyNewRoom(sub);
});
});
Expand Down
6 changes: 4 additions & 2 deletions apps/meteor/client/startup/notifications/updateAvatar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import { Notifications } from '../../../app/notifications/client';

Meteor.startup(() => {
Notifications.onLogged('updateAvatar', (data) => {
const { username, etag } = data;
username && Meteor.users.update({ username }, { $set: { avatarETag: etag } });
if ('username' in data) {
const { username, etag } = data;
username && Meteor.users.update({ username }, { $set: { avatarETag: etag } });
}
});
});
6 changes: 6 additions & 0 deletions apps/meteor/client/startup/userRoles.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ Meteor.startup(() => {
Notifications.onLogged('roles-change', (role) => {
if (role.type === 'added') {
if (!role.scope) {
if (!role.u) {
return;
}
UserRoles.upsert({ _id: role.u._id }, { $addToSet: { roles: role._id }, $set: { username: role.u.username } });
ChatMessage.update({ 'u._id': role.u._id }, { $addToSet: { roles: role._id } }, { multi: true });
}
Expand All @@ -32,6 +35,9 @@ Meteor.startup(() => {

if (role.type === 'removed') {
if (!role.scope) {
if (!role.u) {
return;
}
UserRoles.update({ _id: role.u._id }, { $pull: { roles: role._id } });
ChatMessage.update({ 'u._id': role.u._id }, { $pull: { roles: role._id } }, { multi: true });
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { IRole, IRoom, IUser } from '@rocket.chat/core-typings';
import type { IRoom, IUser } from '@rocket.chat/core-typings';
import { useMethod, useStream } from '@rocket.chat/ui-contexts';
import { useEffect } from 'react';

Expand Down Expand Up @@ -52,36 +52,25 @@ export const useRoomRolesManagement = (rid: IRoom['_id']): void => {

useEffect(
() =>
subscribeToNotifyLoggedIn(
'roles-change',
({
type,
...role
}: {
type: 'added' | 'removed' | 'changed';
_id: IRole['_id'];
u: {
_id: IUser['_id'];
username: IUser['username'];
name: IUser['name'];
};
scope?: IRoom['_id'];
}) => {
if (!role.scope) {
return;
}
subscribeToNotifyLoggedIn('roles-change', ({ type, ...role }) => {
if (!role.scope) {
return;
}

if (!role.u?._id) {
return;
}

switch (type) {
case 'added':
RoomRoles.upsert({ 'rid': role.scope, 'u._id': role.u._id }, { $setOnInsert: { u: role.u }, $addToSet: { roles: role._id } });
break;
switch (type) {
case 'added':
RoomRoles.upsert({ 'rid': role.scope, 'u._id': role.u._id }, { $setOnInsert: { u: role.u }, $addToSet: { roles: role._id } });
break;

case 'removed':
RoomRoles.update({ 'rid': role.scope, 'u._id': role.u._id }, { $pull: { roles: role._id } });
break;
}
},
),
case 'removed':
RoomRoles.update({ 'rid': role.scope, 'u._id': role.u._id }, { $pull: { roles: role._id } });
break;
}
}),
[subscribeToNotifyLoggedIn],
);

Expand Down
4 changes: 2 additions & 2 deletions apps/meteor/client/views/room/providers/RoomProvider.tsx
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { IOmnichannelRoom, IRoom } from '@rocket.chat/core-typings';
import type { IRoom } from '@rocket.chat/core-typings';
import { isOmnichannelRoom } from '@rocket.chat/core-typings';
import { useRoute, useStream } from '@rocket.chat/ui-contexts';
import { useQueryClient } from '@tanstack/react-query';
Expand Down Expand Up @@ -41,7 +41,7 @@ const RoomProvider = ({ rid, children }: RoomProviderProps): ReactElement => {
return;
}

return subscribeToRoom(rid, (room: IRoom | IOmnichannelRoom) => {
return subscribeToRoom(rid, (room) => {
queryClient.setQueryData(['rooms', rid], room);
});
}, [subscribeToRoom, rid, queryClient, room]);
Expand Down
Loading