-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathudpStream.js
61 lines (56 loc) · 1.88 KB
/
udpStream.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
const udp = require('dgram');
const stream = require('readable-stream');
const util = require('util');
const uuid = require('uuid').v4;
function UdpStream(config) {
config = config || {};
stream.Writable.call(this, {autoDestroy: true, ...config});
this.socket = udp.createSocket(config.type || 'udp4');
this.id = Buffer.alloc(16);
uuid(null, this.id);
this.socket.on('message', msg => {
try {
msg = JSON.parse(msg.toString('utf8'));
switch (msg && msg.method) {
case 'uuid': uuid(null, this.id);
}
} catch (e) {}
});
this.on('error', () => {}); // ignore udp errors
this.host = config.host;
this.port = config.port;
this.max = config.max;
this.mtu = (config.mtu || 1400) - this.id.length;
}
util.inherits(UdpStream, stream.Writable);
UdpStream.prototype._write = function(message, encoding, done) {
if (typeof message === 'string') {
message = Buffer.from(message, encoding);
}
if (this.max && message && message.length > this.max) {
done();
return;
}
const id = this.id.slice();
const send = (start, length, cb) => {
this.socket.send(Buffer.concat([id, message.slice(start, start + length)]), this.port, this.host, cb);
};
const sendFrame = (start, length) => {
if (start + length >= message.length) {
send(start, message.length - start, done);
} else {
send(start, length, err => {
if (err) {
done(err);
} else {
setImmediate(() => sendFrame(start + length, length));
}
});
}
};
sendFrame(0, this.mtu);
};
UdpStream.prototype._destroy = function(error, callback) {
this.socket.close(err => callback && callback(err || error));
};
module.exports = config => new UdpStream(config);