-
Notifications
You must be signed in to change notification settings - Fork 476
/
frame.js
113 lines (92 loc) · 3.31 KB
/
frame.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
// The river sweeps through
// Silt and twigs, gravel and leaves
// Driving the wheel on
'use strict';
var defs = require('./defs');
var constants = defs.constants;
var decode = defs.decode;
var Bits = require('@acuminous/bitsyntax');
module.exports.PROTOCOL_HEADER = "AMQP" + String.fromCharCode(0, 0, 9, 1);
/*
Frame format:
0 1 3 7 size+7 size+8
+------+---------+-------------+ +------------+ +-----------+
| type | channel | size | | payload | | frame-end |
+------+---------+-------------+ +------------+ +-----------+
octet short long size octets octet
In general I want to know those first three things straight away, so I
can discard frames early.
*/
// framing constants
var FRAME_METHOD = constants.FRAME_METHOD,
FRAME_HEARTBEAT = constants.FRAME_HEARTBEAT,
FRAME_HEADER = constants.FRAME_HEADER,
FRAME_BODY = constants.FRAME_BODY,
FRAME_END = constants.FRAME_END;
var bodyCons =
Bits.builder(FRAME_BODY,
'channel:16, size:32, payload:size/binary',
FRAME_END);
// %%% TESTME possibly better to cons the first bit and write the
// second directly, in the absence of IO lists
module.exports.makeBodyFrame = function(channel, payload) {
return bodyCons({channel: channel, size: payload.length, payload: payload});
};
var frameHeaderPattern = Bits.matcher('type:8', 'channel:16',
'size:32', 'rest/binary');
function parseFrame(bin, max) {
var fh = frameHeaderPattern(bin);
if (fh) {
var size = fh.size, rest = fh.rest;
if (size > max) {
throw new Error('Frame size exceeds frame max');
}
else if (rest.length > size) {
if (rest[size] !== FRAME_END)
throw new Error('Invalid frame');
return {
type: fh.type,
channel: fh.channel,
size: size,
payload: rest.slice(0, size),
rest: rest.slice(size + 1)
};
}
}
return false;
}
module.exports.parseFrame = parseFrame;
var headerPattern = Bits.matcher('class:16',
'_weight:16',
'size:64',
'flagsAndfields/binary');
var methodPattern = Bits.matcher('id:32, args/binary');
var HEARTBEAT = {channel: 0};
module.exports.decodeFrame = function(frame) {
var payload = frame.payload;
switch (frame.type) {
case FRAME_METHOD:
var idAndArgs = methodPattern(payload);
var id = idAndArgs.id;
var fields = decode(id, idAndArgs.args);
return {id: id, channel: frame.channel, fields: fields};
case FRAME_HEADER:
var parts = headerPattern(payload);
var id = parts['class'];
var fields = decode(id, parts.flagsAndfields);
return {id: id, channel: frame.channel,
size: parts.size, fields: fields};
case FRAME_BODY:
return {channel: frame.channel, content: frame.payload};
case FRAME_HEARTBEAT:
return HEARTBEAT;
default:
throw new Error('Unknown frame type ' + frame.type);
}
}
// encoded heartbeat
module.exports.HEARTBEAT_BUF = Buffer.from([constants.FRAME_HEARTBEAT,
0, 0, 0, 0, // size = 0
0, 0, // channel = 0
constants.FRAME_END]);
module.exports.HEARTBEAT = HEARTBEAT;