Skip to content

Commit

Permalink
fix: make io.to(...) immutable
Browse files Browse the repository at this point in the history
Previously, broadcasting to a given room (by calling `io.to()`) would
mutate the io instance, which could lead to surprising behaviors, like:

```js
io.to("room1");
io.to("room2").emit(...); // also sent to room1

// or with async/await
io.to("room3").emit("details", await fetchDetails()); // random behavior: maybe in room3, maybe to all clients
```

Calling `io.to()` (or any other broadcast modifier) will now return an
immutable instance.

Related:

- #3431
- #3444
  • Loading branch information
darrachequesne committed Mar 1, 2021
1 parent 7de2e87 commit ac9e8ca
Show file tree
Hide file tree
Showing 6 changed files with 269 additions and 136 deletions.
151 changes: 151 additions & 0 deletions lib/broadcast-operator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import type { BroadcastFlags, Room, SocketId } from "socket.io-adapter";
import { RESERVED_EVENTS } from "./socket";
import { PacketType } from "socket.io-parser";
import type { Adapter } from "socket.io-adapter";

export class BroadcastOperator {
constructor(
private readonly adapter: Adapter,
private readonly rooms: Set<Room> = new Set<Room>(),
private readonly exceptRooms: Set<Room> = new Set<Room>(),
private readonly flags: BroadcastFlags = {}
) {}

/**
* Targets a room when emitting.
*
* @param room
* @return a new BroadcastOperator instance
* @public
*/
public to(room: Room): BroadcastOperator {
return new BroadcastOperator(
this.adapter,
new Set([...this.rooms, room]),
this.exceptRooms,
this.flags
);
}

/**
* Targets a room when emitting.
*
* @param room
* @return a new BroadcastOperator instance
* @public
*/
public in(room: Room): BroadcastOperator {
return this.to(room);
}

/**
* Excludes a room when emitting.
*
* @param room
* @return a new BroadcastOperator instance
* @public
*/
public except(room: Room): BroadcastOperator {
return new BroadcastOperator(
this.adapter,
this.rooms,
new Set([...this.exceptRooms, room]),
this.flags
);
}

/**
* Sets the compress flag.
*
* @param compress - if `true`, compresses the sending data
* @return a new BroadcastOperator instance
* @public
*/
public compress(compress: boolean): BroadcastOperator {
const flags = Object.assign({}, this.flags, { compress });
return new BroadcastOperator(
this.adapter,
this.rooms,
this.exceptRooms,
flags
);
}

/**
* Sets a modifier for a subsequent event emission that the event data may be lost if the client is not ready to
* receive messages (because of network slowness or other issues, or because they’re connected through long polling
* and is in the middle of a request-response cycle).
*
* @return a new BroadcastOperator instance
* @public
*/
public get volatile(): BroadcastOperator {
const flags = Object.assign({}, this.flags, { volatile: true });
return new BroadcastOperator(
this.adapter,
this.rooms,
this.exceptRooms,
flags
);
}

/**
* Sets a modifier for a subsequent event emission that the event data will only be broadcast to the current node.
*
* @return a new BroadcastOperator instance
* @public
*/
public get local(): BroadcastOperator {
const flags = Object.assign({}, this.flags, { local: true });
return new BroadcastOperator(
this.adapter,
this.rooms,
this.exceptRooms,
flags
);
}

/**
* Emits to all clients.
*
* @return Always true
* @public
*/
public emit(ev: string | Symbol, ...args: any[]): true {
if (RESERVED_EVENTS.has(ev)) {
throw new Error(`"${ev}" is a reserved event name`);
}
// set up packet object
args.unshift(ev);
const packet = {
type: PacketType.EVENT,
data: args,
};

if ("function" == typeof args[args.length - 1]) {
throw new Error("Callbacks are not supported when broadcasting");
}

this.adapter.broadcast(packet, {
rooms: this.rooms,
except: this.exceptRooms,
flags: this.flags,
});

return true;
}

/**
* Gets a list of clients.
*
* @public
*/
public allSockets(): Promise<Set<SocketId>> {
if (!this.adapter) {
throw new Error(
"No adapter for this namespace, are you trying to get the list of clients of a dynamic namespace?"
);
}
return this.adapter.sockets(this.rooms);
}
}
30 changes: 13 additions & 17 deletions lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import debugModule from "debug";
import { Socket } from "./socket";
import type { CookieSerializeOptions } from "cookie";
import type { CorsOptions } from "cors";
import type { BroadcastOperator } from "./broadcast-operator";

const debug = debugModule("socket.io:server");

Expand Down Expand Up @@ -624,25 +625,23 @@ export class Server extends EventEmitter {
/**
* Targets a room when emitting.
*
* @param name
* @param room
* @return self
* @public
*/
public to(name: Room): this {
this.sockets.to(name);
return this;
public to(room: Room): BroadcastOperator {
return this.sockets.to(room);
}

/**
* Targets a room when emitting.
*
* @param name
* @param room
* @return self
* @public
*/
public in(name: Room): this {
this.sockets.in(name);
return this;
public in(room: Room): BroadcastOperator {
return this.sockets.in(room);
}

/**
Expand Down Expand Up @@ -695,9 +694,8 @@ export class Server extends EventEmitter {
* @return self
* @public
*/
public compress(compress: boolean): this {
this.sockets.compress(compress);
return this;
public compress(compress: boolean): BroadcastOperator {
return this.sockets.compress(compress);
}

/**
Expand All @@ -708,9 +706,8 @@ export class Server extends EventEmitter {
* @return self
* @public
*/
public get volatile(): this {
this.sockets.volatile;
return this;
public get volatile(): BroadcastOperator {
return this.sockets.volatile;
}

/**
Expand All @@ -719,9 +716,8 @@ export class Server extends EventEmitter {
* @return self
* @public
*/
public get local(): this {
this.sockets.local;
return this;
public get local(): BroadcastOperator {
return this.sockets.local;
}
}

Expand Down
89 changes: 19 additions & 70 deletions lib/namespace.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { Socket, RESERVED_EVENTS } from "./socket";
import { Socket } from "./socket";
import type { Server } from "./index";
import type { Client } from "./client";
import { EventEmitter } from "events";
import { PacketType } from "socket.io-parser";
import debugModule from "debug";
import type { Adapter, Room, SocketId } from "socket.io-adapter";
import { BroadcastOperator } from "./broadcast-operator";

const debug = debugModule("socket.io:namespace");

Expand All @@ -26,15 +26,6 @@ export class Namespace extends EventEmitter {
(socket: Socket, next: (err?: ExtendedError) => void) => void
> = [];

/** @private */
_rooms: Set<Room> = new Set();

/** @private */
_except: Set<Room> = new Set();

/** @private */
_flags: any = {};

/** @private */
_ids: number = 0;

Expand Down Expand Up @@ -105,37 +96,34 @@ export class Namespace extends EventEmitter {
/**
* Targets a room when emitting.
*
* @param name
* @param room
* @return self
* @public
*/
public to(name: Room): this {
this._rooms.add(name);
return this;
public to(room: Room): BroadcastOperator {
return new BroadcastOperator(this.adapter).to(room);
}

/**
* Targets a room when emitting.
*
* @param name
* @param room
* @return self
* @public
*/
public in(name: Room): this {
this._rooms.add(name);
return this;
public in(room: Room): BroadcastOperator {
return new BroadcastOperator(this.adapter).in(room);
}

/**
* Excludes a room when emitting.
*
* @param name
* @param room
* @return self
* @public
*/
public except(name: Room): Namespace {
this._except.add(name);
return this;
public except(room: Room): BroadcastOperator {
return new BroadcastOperator(this.adapter).except(room);
}

/**
Expand Down Expand Up @@ -202,36 +190,7 @@ export class Namespace extends EventEmitter {
* @public
*/
public emit(ev: string | Symbol, ...args: any[]): true {
if (RESERVED_EVENTS.has(ev)) {
throw new Error(`"${ev}" is a reserved event name`);
}
// set up packet object
args.unshift(ev);
const packet = {
type: PacketType.EVENT,
data: args,
};

if ("function" == typeof args[args.length - 1]) {
throw new Error("Callbacks are not supported when broadcasting");
}

const rooms = new Set(this._rooms);
const flags = Object.assign({}, this._flags);
const except = new Set(this._except);

// reset flags
this._rooms.clear();
this._flags = {};
this._except.clear();

this.adapter.broadcast(packet, {
rooms: rooms,
flags: flags,
except: except,
});

return true;
return new BroadcastOperator(this.adapter).emit(ev, ...args);
}

/**
Expand Down Expand Up @@ -263,14 +222,7 @@ export class Namespace extends EventEmitter {
* @public
*/
public allSockets(): Promise<Set<SocketId>> {
if (!this.adapter) {
throw new Error(
"No adapter for this namespace, are you trying to get the list of clients of a dynamic namespace?"
);
}
const rooms = new Set(this._rooms);
this._rooms.clear();
return this.adapter.sockets(rooms);
return new BroadcastOperator(this.adapter).allSockets();
}

/**
Expand All @@ -280,9 +232,8 @@ export class Namespace extends EventEmitter {
* @return self
* @public
*/
public compress(compress: boolean): this {
this._flags.compress = compress;
return this;
public compress(compress: boolean): BroadcastOperator {
return new BroadcastOperator(this.adapter).compress(compress);
}

/**
Expand All @@ -293,9 +244,8 @@ export class Namespace extends EventEmitter {
* @return self
* @public
*/
public get volatile(): this {
this._flags.volatile = true;
return this;
public get volatile(): BroadcastOperator {
return new BroadcastOperator(this.adapter).volatile;
}

/**
Expand All @@ -304,8 +254,7 @@ export class Namespace extends EventEmitter {
* @return self
* @public
*/
public get local(): this {
this._flags.local = true;
return this;
public get local(): BroadcastOperator {
return new BroadcastOperator(this.adapter).local;
}
}
Loading

0 comments on commit ac9e8ca

Please sign in to comment.