-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathdatachannel-stream.ts
132 lines (111 loc) · 4.15 KB
/
datachannel-stream.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
/*
* SPDX-FileCopyrightText: 2022 Tim Perry <[email protected]>
* SPDX-License-Identifier: Apache-2.0
*/
import * as stream from 'stream';
import type * as NodeDataChannel from 'node-datachannel';
/**
* Turns a node-datachannel DataChannel into a real Node.js stream, complete with
* buffering, backpressure (up to a point - if the buffer fills up, messages are dropped),
* and support for piping data elsewhere.
*
* Read & written data may be either UTF-8 strings or Buffers - this difference exists at
* the protocol level, and is preserved here throughout.
*/
export class DataChannelStream extends stream.Duplex {
constructor(
private rawChannel: NodeDataChannel.DataChannel,
streamOptions: {
// These are the only Duplex options supported:
readableHighWaterMark?: number | undefined;
writableHighWaterMark?: number | undefined;
allowHalfOpen?: boolean;
} = {}
) {
super({
allowHalfOpen: false, // Default to autoclose on end().
...streamOptions,
objectMode: true // Preserve the string/buffer distinction (WebRTC treats them differently)
});
rawChannel.onMessage((msg) => {
// Independently of the stream and it's normal events, we also fire our own
// read/wrote-data events, used for MockRTC event subscriptions. These aren't
// buffered, and this ensures that those events do not consume data that will
// separately be processed by handler steps.
this.emit('read-data', msg);
if (!this._readActive) return; // If the buffer is full, drop messages.
// If the push is rejected, we pause reading until the next call to _read().
this._readActive = this.push(msg);
});
// When the DataChannel closes, the readable & writable ends close
rawChannel.onClosed(() => {
this.push(null);
this.destroy();
});
rawChannel.onError((errMsg) => {
this.destroy(new Error(`DataChannel error: ${errMsg}`));
});
// Buffer all writes until the DataChannel opens
if (!rawChannel.isOpen()) {
this.cork();
rawChannel.onOpen(() => {
this.uncork();
this._isOpen = true;
this.emit('channel-open');
});
} else {
setImmediate(() => {
this._isOpen = true;
this.emit('channel-open');
});
}
}
private _isOpen = false;
get isOpen() {
return this._isOpen;
}
private _readActive = true;
_read() {
// Stop dropping messages, if the buffer filling up meant we were doing so before.
this._readActive = true;
}
_write(chunk: string | Buffer | unknown, encoding: string, callback: (error: Error | null) => void) {
let sentOk: boolean;
try {
if (Buffer.isBuffer(chunk)) {
sentOk = this.rawChannel.sendMessageBinary(chunk);
} else if (typeof chunk === 'string') {
sentOk = this.rawChannel.sendMessage(chunk);
} else {
const typeName = (chunk as object).constructor.name || typeof chunk;
throw new Error(`Cannot write ${typeName} to DataChannel stream`);
}
this.emit('wrote-data', chunk);
} catch (err: any) {
return callback(err);
}
if (sentOk) {
callback(null);
} else {
callback(new Error("Failed to write to DataChannel"));
}
}
_final(callback: (error: Error | null) => void) {
if (!this.allowHalfOpen) this.destroy();
callback(null);
}
_destroy(maybeErr: Error | null, callback: (error: Error | null) => void) {
// When the stream is destroyed, we close the DataChannel.
this.rawChannel.close();
callback(maybeErr);
}
get id() {
return this.rawChannel.getId();
}
get label() {
return this.rawChannel.getLabel();
}
get protocol() {
return this.rawChannel.getProtocol();
}
}