diff --git a/changelog.d/251.feature b/changelog.d/251.feature new file mode 100644 index 00000000..8a7160bf --- /dev/null +++ b/changelog.d/251.feature @@ -0,0 +1 @@ +Add `MembershipQueue` component diff --git a/package-lock.json b/package-lock.json index f9620b9f..23b2480d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1606,9 +1606,9 @@ "integrity": "sha1-Qa4u62XvpiJorr/qg6x9eSmbCIc=" }, "eventemitter3": { - "version": "4.0.4", - "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-4.0.4.tgz", - "integrity": "sha512-rlaVLnVxtxvoyLsQQFBx53YmXHDxRIzzTLbdfxqi4yocpSjAxXwkU0cScM5JgSKMqEhrZpnvQ2D9gjylR0AimQ==" + "version": "4.0.7", + "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-4.0.7.tgz", + "integrity": "sha512-8guHBZCwKnFhYdHr2ysuRWErTwhoN2X8XELRlrRwpmfeY2jjuUN4taQMsULKUVo1K4DvZl+0pgfyoysHxvmvEw==" }, "expand-brackets": { "version": "2.1.4", @@ -3407,12 +3407,12 @@ } }, "p-queue": { - "version": "6.6.0", - "resolved": "https://registry.npmjs.org/p-queue/-/p-queue-6.6.0.tgz", - "integrity": "sha512-zPHXPNy9jZsiym0PpJjvnHQysx1fSd/QdaNVwiDRLU2KFChD6h9CkCB6b8i3U8lBwJyA+mHgNZCzcy77glUssQ==", + "version": "6.6.2", + "resolved": "https://registry.npmjs.org/p-queue/-/p-queue-6.6.2.tgz", + "integrity": "sha512-RwFpb72c/BhQLEXIZ5K2e+AhgNVmIejGlTgiB9MzZ0e93GRvqZ7uSi0dvRF7/XIXDeNkra2fNHBxTyPDGySpjQ==", "requires": { "eventemitter3": "^4.0.4", - "p-timeout": "^3.1.0" + "p-timeout": "^3.2.0" } }, "p-timeout": { diff --git a/package.json b/package.json index e2bd000a..4ff95a0d 100644 --- a/package.json +++ b/package.json @@ -35,7 +35,7 @@ "matrix-js-sdk": "^8.4.1", "nedb": "^1.8.0", "nopt": "^4.0.3", - "p-queue": "^6.6.0", + "p-queue": "^6.6.2", "prom-client": "^12.0.0", "winston": "^3.3.3", "winston-daily-rotate-file": "^4.5.0" diff --git a/src/components/membership-queue.ts b/src/components/membership-queue.ts new file mode 100644 index 00000000..d01b1f43 --- /dev/null +++ b/src/components/membership-queue.ts @@ -0,0 +1,148 @@ +import { Bridge } from "../bridge"; +import { Request } from "./request"; +import { get as getLogger } from "./logging"; +import PQueue from "p-queue"; + +const log = getLogger("MembershipQueue"); + +interface QueueUserItem { + type: "join"|"leave"; + kickUser?: string; + reason?: string; + attempts: number; + roomId: string; + userId: string; + retry: boolean; + req: Request; +} + +export interface MembershipQueueOpts { + concurrentRoomLimit: number; + maxAttempts: number; + joinDelayMs: number; + maxJoinDelayMs: number; +} + +const DEFAULT_OPTS = { + concurrentRoomLimit: 8, + maxAttempts: 10, + joinDelayMs: 500, + maxJoinDelayMs: 30 * 60 * 1000, // 30 mins +}; + + +/** + * This class sends membership changes for rooms in a linearized queue. + */ +export class MembershipQueue { + private queues: Map = new Map(); + + constructor(private bridge: Bridge, private opts: MembershipQueueOpts) { + this.opts = { ...DEFAULT_OPTS, ...this.opts}; + for (let i = 0; i < this.opts.concurrentRoomLimit; i++) { + this.queues.set(i, new PQueue({ + autoStart: true, + concurrency: 1, + })); + } + } + + /** + * Join a user to a room + * @param roomId The roomId to join + * @param userId Leave empty to act as the bot user. + * @param req The request entry for logging context + * @param retry Should the request retry if it fails + */ + public async join(roomId: string, userId: string|undefined, req: Request, retry = true) { + return this.queueMembership({ + roomId, + userId: userId || this.bridge.botUserId, + retry, + req, + attempts: 0, + type: "join", + }); + } + + /** + * Leave OR kick a user from a room + * @param roomId The roomId to leave + * @param userId Leave empty to act as the bot user. + * @param req The request entry for logging context + * @param retry Should the request retry if it fails + * @param reason Reason for leaving/kicking + * @param kickUser The user to be kicked. If left blank, this will be a leave. + */ + public async leave(roomId: string, userId: string, req: Request, + retry = true, reason?: string, kickUser?: string) { + return this.queueMembership({ + roomId, + userId: userId || this.bridge.botUserId, + retry, + req, + attempts: 0, + reason, + kickUser, + type: "leave", + }) + } + + public async queueMembership(item: QueueUserItem) { + try { + const queue = this.queues.get(this.hashRoomId(item.roomId)); + if (!queue) { + throw Error("Could not find queue for hash"); + } + queue.add(() => this.serviceQueue(item)); + } + catch (ex) { + log.error(`Failed to handle membership: ${ex}`); + throw ex; + } + } + + private hashRoomId(roomId: string) { + return Array.from(roomId).map((s) => s.charCodeAt(0)).reduce((a, b) => a + b, 0) + % this.opts.concurrentRoomLimit; + } + + private async serviceQueue(item: QueueUserItem) { + const { req, roomId, userId, reason, kickUser, attempts, type } = item; + const reqIdStr = req.getId() ? `[${req.getId()}]`: ""; + log.debug(`${reqIdStr} ${userId}@${roomId} -> ${type} (reason: ${reason || "none"}, kicker: ${kickUser})`); + const intent = this.bridge.getIntent(kickUser || userId); + try { + if (type === "join") { + await intent.join(roomId); + } + else if (kickUser) { + await intent.kick(roomId, userId, reason); + } + else { + await intent.leave(roomId, reason); + } + } + catch (ex) { + if (!this.shouldRetry(ex, attempts)) { + throw ex; + } + const delay = Math.min( + (this.opts.joinDelayMs * attempts) + (Math.random() * 500), + this.opts.maxJoinDelayMs + ); + log.warn(`${reqIdStr} Failed to ${type} ${roomId}, delaying for ${delay}ms`); + log.debug(`${reqIdStr} Failed with: ${ex.errcode} ${ex.message}`); + await new Promise((r) => setTimeout(r, delay)); + this.queueMembership({...item, attempts: item.attempts + 1}); + } + } + + private shouldRetry(ex: {code: string; errcode: string; httpStatus: number}, attempts: number): boolean { + return !( + attempts === this.opts.maxAttempts || + ex.errcode === "M_FORBIDDEN" || + ex.httpStatus === 403 + ); + } +} diff --git a/src/index.ts b/src/index.ts index 0a8c18f0..cf0637aa 100644 --- a/src/index.ts +++ b/src/index.ts @@ -47,6 +47,7 @@ export * from "matrix-appservice"; export * from "./components/prometheusmetrics"; export * from "./components/agecounters"; export * from "./components/membership-cache"; +export * from "./components/membership-queue"; export * as Logging from "./components/logging"; export { unstable } from "./errors"; export * from "./components/event-types";