-
Notifications
You must be signed in to change notification settings - Fork 476
/
connection.js
675 lines (605 loc) · 21.1 KB
/
connection.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
//
//
//
'use strict';
var defs = require('./defs');
var constants = defs.constants;
var frame = require('./frame');
var HEARTBEAT = frame.HEARTBEAT;
var Mux = require('./mux').Mux;
var Duplex = require('stream').Duplex;
var EventEmitter = require('events');
var Heart = require('./heartbeat').Heart;
var methodName = require('./format').methodName;
var closeMsg = require('./format').closeMessage;
var inspect = require('./format').inspect;
var BitSet = require('./bitset').BitSet;
var fmt = require('util').format;
var PassThrough = require('stream').PassThrough;
var IllegalOperationError = require('./error').IllegalOperationError;
var stackCapture = require('./error').stackCapture;
// High-water mark for channel write buffers, in 'objects' (which are
// encoded frames as buffers).
var DEFAULT_WRITE_HWM = 1024;
// If all the frames of a message (method, properties, content) total
// to less than this, copy them into a single buffer and write it all
// at once. Note that this is less than the minimum frame size: if it
// was greater, we might have to fragment the content.
var SINGLE_CHUNK_THRESHOLD = 2048;
class Connection extends EventEmitter {
constructor (underlying) {
super();
var stream = this.stream = wrapStream(underlying);
this.muxer = new Mux(stream);
// frames
this.rest = Buffer.alloc(0);
this.frameMax = constants.FRAME_MIN_SIZE;
this.sentSinceLastCheck = false;
this.recvSinceLastCheck = false;
this.expectSocketClose = false;
this.freeChannels = new BitSet();
this.channels = [{
channel: { accept: channel0(this) },
buffer: underlying
}];
}
// This changed between versions, as did the codec, methods, etc. AMQP
// 0-9-1 is fairly similar to 0.8, but better, and nothing implements
// 0.8 that doesn't implement 0-9-1. In other words, it doesn't make
// much sense to generalise here.
sendProtocolHeader () {
this.sendBytes(frame.PROTOCOL_HEADER);
}
/*
The frighteningly complicated opening protocol (spec section 2.2.4):
Client -> Server
protocol header ->
<- start
start-ok ->
.. next two zero or more times ..
<- secure
secure-ok ->
<- tune
tune-ok ->
open ->
<- open-ok
If I'm only supporting SASL's PLAIN mechanism (which I am for the time
being), it gets a bit easier since the server won't in general send
back a `secure`, it'll just send `tune` after the `start-ok`.
(SASL PLAIN: http://tools.ietf.org/html/rfc4616)
*/
open (allFields, openCallback0) {
var self = this;
var openCallback = openCallback0 || function () { };
// This is where we'll put our negotiated values
var tunedOptions = Object.create(allFields);
function wait (k) {
self.step(function (err, frame) {
if (err !== null)
bail(err);
else if (frame.channel !== 0) {
bail(new Error(
fmt("Frame on channel != 0 during handshake: %s",
inspect(frame, false))));
}
else
k(frame);
});
}
function expect (Method, k) {
wait(function (frame) {
if (frame.id === Method)
k(frame);
else {
bail(new Error(
fmt("Expected %s; got %s",
methodName(Method), inspect(frame, false))));
}
});
}
function bail (err) {
openCallback(err);
}
function send (Method) {
// This can throw an exception if there's some problem with the
// options; e.g., something is a string instead of a number.
self.sendMethod(0, Method, tunedOptions);
}
function negotiate (server, desired) {
// We get sent values for channelMax, frameMax and heartbeat,
// which we may accept or lower (subject to a minimum for
// frameMax, but we'll leave that to the server to enforce). In
// all cases, `0` really means "no limit", or rather the highest
// value in the encoding, e.g., unsigned short for channelMax.
if (server === 0 || desired === 0) {
// i.e., whichever places a limit, if either
return Math.max(server, desired);
}
else {
return Math.min(server, desired);
}
}
function onStart (start) {
var mechanisms = start.fields.mechanisms.toString().split(' ');
if (mechanisms.indexOf(allFields.mechanism) < 0) {
bail(new Error(fmt('SASL mechanism %s is not provided by the server',
allFields.mechanism)));
return;
}
self.serverProperties = start.fields.serverProperties;
try {
send(defs.ConnectionStartOk);
} catch (err) {
bail(err);
return;
}
wait(afterStartOk);
}
function afterStartOk (reply) {
switch (reply.id) {
case defs.ConnectionSecure:
bail(new Error(
"Wasn't expecting to have to go through secure"));
break;
case defs.ConnectionClose:
bail(new Error(fmt("Handshake terminated by server: %s",
closeMsg(reply))));
break;
case defs.ConnectionTune:
var fields = reply.fields;
tunedOptions.frameMax =
negotiate(fields.frameMax, allFields.frameMax);
tunedOptions.channelMax =
negotiate(fields.channelMax, allFields.channelMax);
tunedOptions.heartbeat =
negotiate(fields.heartbeat, allFields.heartbeat);
try {
send(defs.ConnectionTuneOk);
send(defs.ConnectionOpen);
} catch (err) {
bail(err);
return;
}
expect(defs.ConnectionOpenOk, onOpenOk);
break;
default:
bail(new Error(
fmt("Expected connection.secure, connection.close, " +
"or connection.tune during handshake; got %s",
inspect(reply, false))));
break;
}
}
function onOpenOk (openOk) {
// Impose the maximum of the encoded value, if the negotiated
// value is zero, meaning "no, no limits"
self.channelMax = tunedOptions.channelMax || 0xffff;
self.frameMax = tunedOptions.frameMax || 0xffffffff;
// 0 means "no heartbeat", rather than "maximum period of
// heartbeating"
self.heartbeat = tunedOptions.heartbeat;
self.heartbeater = self.startHeartbeater();
self.accept = mainAccept;
succeed(openOk);
}
// If the server closes the connection, it's probably because of
// something we did
function endWhileOpening (err) {
bail(err || new Error('Socket closed abruptly ' +
'during opening handshake'));
}
this.stream.on('end', endWhileOpening);
this.stream.on('error', endWhileOpening);
function succeed (ok) {
self.stream.removeListener('end', endWhileOpening);
self.stream.removeListener('error', endWhileOpening);
self.stream.on('error', self.onSocketError.bind(self));
self.stream.on('end', self.onSocketError.bind(
self, new Error('Unexpected close')));
self.on('frameError', self.onSocketError.bind(self));
self.acceptLoop();
openCallback(null, ok);
}
// Now kick off the handshake by prompting the server
this.sendProtocolHeader();
expect(defs.ConnectionStart, onStart);
}
// Closing things: AMQP has a closing handshake that applies to
// closing both connects and channels. As the initiating party, I send
// Close, then ignore all frames until I see either CloseOK --
// which signifies that the other party has seen the Close and shut
// the connection or channel down, so it's fine to free resources; or
// Close, which means the other party also wanted to close the
// whatever, and I should send CloseOk so it can free resources,
// then go back to waiting for the CloseOk. If I receive a Close
// out of the blue, I should throw away any unsent frames (they will
// be ignored anyway) and send CloseOk, then clean up resources. In
// general, Close out of the blue signals an error (or a forced
// closure, which may as well be an error).
//
// RUNNING [1] --- send Close ---> Closing [2] ---> recv Close --+
// | | [3]
// | +------ send CloseOk ------+
// recv Close recv CloseOk
// | |
// V V
// Ended [4] ---- send CloseOk ---> Closed [5]
//
// [1] All frames accepted; getting a Close frame from the server
// moves to Ended; client may initiate a close by sending Close
// itself.
// [2] Client has initiated a close; only CloseOk or (simulataneously
// sent) Close is accepted.
// [3] Simultaneous close
// [4] Server won't send any more frames; accept no more frames, send
// CloseOk.
// [5] Fully closed, client will send no more, server will send no
// more. Signal 'close' or 'error'.
//
// There are two signalling mechanisms used in the API. The first is
// that calling `close` will return a promise, that will either
// resolve once the connection or channel is cleanly shut down, or
// will reject if the shutdown times out.
//
// The second is the 'close' and 'error' events. These are
// emitted as above. The events will fire *before* promises are
// resolved.
// Close the connection without even giving a reason. Typical.
close (closeCallback) {
var k = closeCallback && function () { closeCallback(null); };
this.closeBecause("Cheers, thanks", constants.REPLY_SUCCESS, k);
}
// Close with a reason and a 'code'. I'm pretty sure RabbitMQ totally
// ignores these; maybe it logs them. The continuation will be invoked
// when the CloseOk has been received, and before the 'close' event.
closeBecause (reason, code, k) {
this.sendMethod(0, defs.ConnectionClose, {
replyText: reason,
replyCode: code,
methodId: 0, classId: 0
});
var s = stackCapture('closeBecause called: ' + reason);
this.toClosing(s, k);
}
closeWithError (reason, code, error) {
this.emit('error', error);
this.closeBecause(reason, code);
}
onSocketError (err) {
if (!this.expectSocketClose) {
// forestall any more calls to onSocketError, since we're signed
// up for `'error'` *and* `'end'`
this.expectSocketClose = true;
this.emit('error', err);
var s = stackCapture('Socket error');
this.toClosed(s, err);
}
}
// A close has been initiated. Repeat: a close has been initiated.
// This means we should not send more frames, anyway they will be
// ignored. We also have to shut down all the channels.
toClosing (capturedStack, k) {
var send = this.sendMethod.bind(this);
this.accept = function (f) {
if (f.id === defs.ConnectionCloseOk) {
if (k)
k();
var s = stackCapture('ConnectionCloseOk received');
this.toClosed(s, undefined);
}
else if (f.id === defs.ConnectionClose) {
send(0, defs.ConnectionCloseOk, {});
}
// else ignore frame
};
invalidateSend(this, 'Connection closing', capturedStack);
}
_closeChannels (capturedStack) {
for (var i = 1; i < this.channels.length; i++) {
var ch = this.channels[i];
if (ch !== null) {
ch.channel.toClosed(capturedStack); // %%% or with an error? not clear
}
}
}
// A close has been confirmed. Cease all communication.
toClosed (capturedStack, maybeErr) {
this._closeChannels(capturedStack);
var info = fmt('Connection closed (%s)',
(maybeErr) ? maybeErr.toString() : 'by client');
// Tidy up, invalidate enverything, dynamite the bridges.
invalidateSend(this, info, capturedStack);
this.accept = invalidOp(info, capturedStack);
this.close = function (cb) {
cb && cb(new IllegalOperationError(info, capturedStack));
};
if (this.heartbeater)
this.heartbeater.clear();
// This is certainly true now, if it wasn't before
this.expectSocketClose = true;
this.stream.end();
this.emit('close', maybeErr);
}
_updateSecret(newSecret, reason, cb) {
this.sendMethod(0, defs.ConnectionUpdateSecret, {
newSecret,
reason
});
this.once('update-secret-ok', cb);
}
// ===
startHeartbeater () {
if (this.heartbeat === 0)
return null;
else {
var self = this;
var hb = new Heart(this.heartbeat,
this.checkSend.bind(this),
this.checkRecv.bind(this));
hb.on('timeout', function () {
var hberr = new Error("Heartbeat timeout");
self.emit('error', hberr);
var s = stackCapture('Heartbeat timeout');
self.toClosed(s, hberr);
});
hb.on('beat', function () {
self.sendHeartbeat();
});
return hb;
}
}
// I use an array to keep track of the channels, rather than an
// object. The channel identifiers are numbers, and allocated by the
// connection. If I try to allocate low numbers when they are
// available (which I do, by looking from the start of the bitset),
// this ought to keep the array small, and out of 'sparse array
// storage'. I also set entries to null, rather than deleting them, in
// the expectation that the next channel allocation will fill the slot
// again rather than growing the array. See
// http://www.html5rocks.com/en/tutorials/speed/v8/
freshChannel (channel, options) {
var next = this.freeChannels.nextClearBit(1);
if (next < 0 || next > this.channelMax)
throw new Error("No channels left to allocate");
this.freeChannels.set(next);
var hwm = (options && options.highWaterMark) || DEFAULT_WRITE_HWM;
var writeBuffer = new PassThrough({
objectMode: true, highWaterMark: hwm
});
this.channels[next] = { channel: channel, buffer: writeBuffer };
writeBuffer.on('drain', function () {
channel.onBufferDrain();
});
this.muxer.pipeFrom(writeBuffer);
return next;
}
releaseChannel (channel) {
this.freeChannels.clear(channel);
var buffer = this.channels[channel].buffer;
buffer.end(); // will also cause it to be unpiped
this.channels[channel] = null;
}
acceptLoop () {
var self = this;
function go () {
try {
var f; while (f = self.recvFrame())
self.accept(f);
}
catch (e) {
self.emit('frameError', e);
}
}
self.stream.on('readable', go);
go();
}
step (cb) {
var self = this;
function recv () {
var f;
try {
f = self.recvFrame();
}
catch (e) {
cb(e, null);
return;
}
if (f)
cb(null, f);
else
self.stream.once('readable', recv);
}
recv();
}
checkSend () {
var check = this.sentSinceLastCheck;
this.sentSinceLastCheck = false;
return check;
}
checkRecv () {
var check = this.recvSinceLastCheck;
this.recvSinceLastCheck = false;
return check;
}
sendBytes (bytes) {
this.sentSinceLastCheck = true;
this.stream.write(bytes);
}
sendHeartbeat () {
return this.sendBytes(frame.HEARTBEAT_BUF);
}
sendMethod (channel, Method, fields) {
var frame = encodeMethod(Method, channel, fields);
this.sentSinceLastCheck = true;
var buffer = this.channels[channel].buffer;
return buffer.write(frame);
}
sendMessage (channel, Method, fields, Properties, props, content) {
if (!Buffer.isBuffer(content))
throw new TypeError('content is not a buffer');
var mframe = encodeMethod(Method, channel, fields);
var pframe = encodeProperties(Properties, channel,
content.length, props);
var buffer = this.channels[channel].buffer;
this.sentSinceLastCheck = true;
var methodHeaderLen = mframe.length + pframe.length;
var bodyLen = (content.length > 0) ?
content.length + FRAME_OVERHEAD : 0;
var allLen = methodHeaderLen + bodyLen;
if (allLen < SINGLE_CHUNK_THRESHOLD) {
// Use `allocUnsafe` to avoid excessive allocations and CPU usage
// from zeroing. The returned Buffer is not zeroed and so must be
// completely filled to be used safely.
// See https://github.com/amqp-node/amqplib/pull/695
var all = Buffer.allocUnsafe(allLen);
var offset = mframe.copy(all, 0);
offset += pframe.copy(all, offset);
if (bodyLen > 0)
makeBodyFrame(channel, content).copy(all, offset);
return buffer.write(all);
}
else {
if (methodHeaderLen < SINGLE_CHUNK_THRESHOLD) {
// Use `allocUnsafe` to avoid excessive allocations and CPU usage
// from zeroing. The returned Buffer is not zeroed and so must be
// completely filled to be used safely.
// See https://github.com/amqp-node/amqplib/pull/695
var both = Buffer.allocUnsafe(methodHeaderLen);
var offset = mframe.copy(both, 0);
pframe.copy(both, offset);
buffer.write(both);
}
else {
buffer.write(mframe);
buffer.write(pframe);
}
return this.sendContent(channel, content);
}
}
sendContent (channel, body) {
if (!Buffer.isBuffer(body)) {
throw new TypeError(fmt("Expected buffer; got %s", body));
}
var writeResult = true;
var buffer = this.channels[channel].buffer;
var maxBody = this.frameMax - FRAME_OVERHEAD;
for (var offset = 0; offset < body.length; offset += maxBody) {
var end = offset + maxBody;
var slice = (end > body.length) ? body.subarray(offset) : body.subarray(offset, end);
var bodyFrame = makeBodyFrame(channel, slice);
writeResult = buffer.write(bodyFrame);
}
this.sentSinceLastCheck = true;
return writeResult;
}
recvFrame () {
// %%% identifying invariants might help here?
var frame = parseFrame(this.rest, this.frameMax);
if (!frame) {
var incoming = this.stream.read();
if (incoming === null) {
return false;
}
else {
this.recvSinceLastCheck = true;
this.rest = Buffer.concat([this.rest, incoming]);
return this.recvFrame();
}
}
else {
this.rest = frame.rest;
return decodeFrame(frame);
}
}
}
// Usual frame accept mode
function mainAccept(frame) {
var rec = this.channels[frame.channel];
if (rec) { return rec.channel.accept(frame); }
// NB CHANNEL_ERROR may not be right, but I don't know what is ..
else
this.closeWithError(
fmt('Frame on unknown channel %d', frame.channel),
constants.CHANNEL_ERROR,
new Error(fmt("Frame on unknown channel: %s",
inspect(frame, false))));
}
// Handle anything that comes through on channel 0, that's the
// connection control channel. This is only used once mainAccept is
// installed as the frame handler, after the opening handshake.
function channel0(connection) {
return function(f) {
// Once we get a 'close', we know 1. we'll get no more frames, and
// 2. anything we send except close, or close-ok, will be
// ignored. If we already sent 'close', this won't be invoked since
// we're already in closing mode; if we didn't well we're not going
// to send it now are we.
if (f === HEARTBEAT); // ignore; it's already counted as activity
// on the socket, which is its purpose
else if (f.id === defs.ConnectionClose) {
// Oh. OK. I guess we're done here then.
connection.sendMethod(0, defs.ConnectionCloseOk, {});
var emsg = fmt('Connection closed: %s', closeMsg(f));
var s = stackCapture(emsg);
var e = new Error(emsg);
e.code = f.fields.replyCode;
if (isFatalError(e)) {
connection.emit('error', e);
}
connection.toClosed(s, e);
}
else if (f.id === defs.ConnectionBlocked) {
connection.emit('blocked', f.fields.reason);
}
else if (f.id === defs.ConnectionUnblocked) {
connection.emit('unblocked');
}
else if (f.id === defs.ConnectionUpdateSecretOk) {
connection.emit('update-secret-ok');
}
else {
connection.closeWithError(
fmt("Unexpected frame on channel 0"),
constants.UNEXPECTED_FRAME,
new Error(fmt("Unexpected frame on channel 0: %s",
inspect(f, false))));
}
};
}
function invalidOp(msg, stack) {
return function() {
throw new IllegalOperationError(msg, stack);
};
}
function invalidateSend(conn, msg, stack) {
conn.sendMethod = conn.sendContent = conn.sendMessage =
invalidOp(msg, stack);
}
var encodeMethod = defs.encodeMethod;
var encodeProperties = defs.encodeProperties;
var FRAME_OVERHEAD = defs.FRAME_OVERHEAD;
var makeBodyFrame = frame.makeBodyFrame;
var parseFrame = frame.parseFrame;
var decodeFrame = frame.decodeFrame;
function wrapStream(s) {
if (s instanceof Duplex) return s;
else {
var ws = new Duplex();
ws.wrap(s); //wraps the readable side of things
ws._write = function(chunk, encoding, callback) {
return s.write(chunk, encoding, callback);
};
return ws;
}
}
function isFatalError(error) {
switch (error && error.code) {
case defs.constants.CONNECTION_FORCED:
case defs.constants.REPLY_SUCCESS:
return false;
default:
return true;
}
}
module.exports.Connection = Connection;
module.exports.isFatalError = isFatalError;