Skip to content

Commit

Permalink
Merge pull request #251 from matrix-org/hs/membership-queue
Browse files Browse the repository at this point in the history
Membership Queue component
  • Loading branch information
Half-Shot authored Oct 20, 2020
2 parents 66156e7 + ce7ba9f commit 571b868
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 8 deletions.
1 change: 1 addition & 0 deletions changelog.d/251.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add `MembershipQueue` component
14 changes: 7 additions & 7 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
"matrix-js-sdk": "^8.4.1",
"nedb": "^1.8.0",
"nopt": "^4.0.3",
"p-queue": "^6.6.0",
"p-queue": "^6.6.2",
"prom-client": "^12.0.0",
"winston": "^3.3.3",
"winston-daily-rotate-file": "^4.5.0"
Expand Down
148 changes: 148 additions & 0 deletions src/components/membership-queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import { Bridge } from "../bridge";
import { Request } from "./request";
import { get as getLogger } from "./logging";
import PQueue from "p-queue";

const log = getLogger("MembershipQueue");

interface QueueUserItem {
type: "join"|"leave";
kickUser?: string;
reason?: string;
attempts: number;
roomId: string;
userId: string;
retry: boolean;
req: Request<unknown>;
}

export interface MembershipQueueOpts {
concurrentRoomLimit: number;
maxAttempts: number;
joinDelayMs: number;
maxJoinDelayMs: number;
}

const DEFAULT_OPTS = {
concurrentRoomLimit: 8,
maxAttempts: 10,
joinDelayMs: 500,
maxJoinDelayMs: 30 * 60 * 1000, // 30 mins
};


/**
* This class sends membership changes for rooms in a linearized queue.
*/
export class MembershipQueue {
private queues: Map<number, PQueue> = new Map();

constructor(private bridge: Bridge, private opts: MembershipQueueOpts) {
this.opts = { ...DEFAULT_OPTS, ...this.opts};
for (let i = 0; i < this.opts.concurrentRoomLimit; i++) {
this.queues.set(i, new PQueue({
autoStart: true,
concurrency: 1,
}));
}
}

/**
* Join a user to a room
* @param roomId The roomId to join
* @param userId Leave empty to act as the bot user.
* @param req The request entry for logging context
* @param retry Should the request retry if it fails
*/
public async join(roomId: string, userId: string|undefined, req: Request<unknown>, retry = true) {
return this.queueMembership({
roomId,
userId: userId || this.bridge.botUserId,
retry,
req,
attempts: 0,
type: "join",
});
}

/**
* Leave OR kick a user from a room
* @param roomId The roomId to leave
* @param userId Leave empty to act as the bot user.
* @param req The request entry for logging context
* @param retry Should the request retry if it fails
* @param reason Reason for leaving/kicking
* @param kickUser The user to be kicked. If left blank, this will be a leave.
*/
public async leave(roomId: string, userId: string, req: Request<unknown>,
retry = true, reason?: string, kickUser?: string) {
return this.queueMembership({
roomId,
userId: userId || this.bridge.botUserId,
retry,
req,
attempts: 0,
reason,
kickUser,
type: "leave",
})
}

public async queueMembership(item: QueueUserItem) {
try {
const queue = this.queues.get(this.hashRoomId(item.roomId));
if (!queue) {
throw Error("Could not find queue for hash");
}
queue.add(() => this.serviceQueue(item));
}
catch (ex) {
log.error(`Failed to handle membership: ${ex}`);
throw ex;
}
}

private hashRoomId(roomId: string) {
return Array.from(roomId).map((s) => s.charCodeAt(0)).reduce((a, b) => a + b, 0)
% this.opts.concurrentRoomLimit;
}

private async serviceQueue(item: QueueUserItem) {
const { req, roomId, userId, reason, kickUser, attempts, type } = item;
const reqIdStr = req.getId() ? `[${req.getId()}]`: "";
log.debug(`${reqIdStr} ${userId}@${roomId} -> ${type} (reason: ${reason || "none"}, kicker: ${kickUser})`);
const intent = this.bridge.getIntent(kickUser || userId);
try {
if (type === "join") {
await intent.join(roomId);
}
else if (kickUser) {
await intent.kick(roomId, userId, reason);
}
else {
await intent.leave(roomId, reason);
}
}
catch (ex) {
if (!this.shouldRetry(ex, attempts)) {
throw ex;
}
const delay = Math.min(
(this.opts.joinDelayMs * attempts) + (Math.random() * 500),
this.opts.maxJoinDelayMs
);
log.warn(`${reqIdStr} Failed to ${type} ${roomId}, delaying for ${delay}ms`);
log.debug(`${reqIdStr} Failed with: ${ex.errcode} ${ex.message}`);
await new Promise((r) => setTimeout(r, delay));
this.queueMembership({...item, attempts: item.attempts + 1});
}
}

private shouldRetry(ex: {code: string; errcode: string; httpStatus: number}, attempts: number): boolean {
return !(
attempts === this.opts.maxAttempts ||
ex.errcode === "M_FORBIDDEN" ||
ex.httpStatus === 403
);
}
}
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ export * from "matrix-appservice";
export * from "./components/prometheusmetrics";
export * from "./components/agecounters";
export * from "./components/membership-cache";
export * from "./components/membership-queue";
export * as Logging from "./components/logging";
export { unstable } from "./errors";
export * from "./components/event-types";
Expand Down

0 comments on commit 571b868

Please sign in to comment.