-
Notifications
You must be signed in to change notification settings - Fork 156
/
segment_emitter.js
193 lines (165 loc) · 5.14 KB
/
segment_emitter.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
var dgram = require('dgram');
var batcher = require('atomic-batcher');
var logger = require('./logger');
var PROTOCOL_HEADER = '{"format":"json","version":1}';
var PROTOCOL_DELIMITER = '\n';
/**
* Sends a collection of data over a UDP socket. This method
* is designed to be used by `atomic-batcher` as a way to share
* a single UDP socket for sending multiple data blocks.
*
* @param {object} ops - Details of the data to send
* @param {Function} callback - The function to call when done
*/
function batchSendData (ops, callback) {
var client = dgram.createSocket('udp4');
executeSendData(client, ops, 0, function () {
try {
client.close();
} finally {
callback();
}
});
}
/**
* Execute sending data starting at the specified index and
* using the provided client.
*
* @param {Socket} client - Socket to send data with
* @param {object} ops - Details of data to send
* @param {number} index - Starting index for sending
* @param {Function} callback - Function to call when done
*/
function executeSendData (client, ops, index, callback) {
if (index >= ops.length) {
callback();
return;
}
sendMessage(client, ops[index], function () {
executeSendData(client, ops, index+1, callback);
});
}
/**
* Send a single message over a UDP socket.
*
* @param {Socket} client - Socket to send data with
* @param {object} data - Details of the data to send
* @param {Function} batchCallback - Function to call when done
*/
function sendMessage (client, data, batchCallback) {
var msg = data.msg;
var offset = data.offset;
var length = data.length;
var port = data.port;
var address = data.address;
var callback = data.callback;
client.send(msg, offset, length, port, address, function(err) {
try {
callback(err);
} finally {
batchCallback();
}
});
}
/**
* Class to mimic the Socket interface for a UDP client, but to provided
* batching of outgoing sends using temporary Sockets that are created and
* destroyed as needed.
*/
function BatchingTemporarySocket() {
this.batchSend = batcher(batchSendData);
}
/**
* Provide the same signature as the Socket.send method but the work will be
* batched to share temporary UDP sockets whenever possible.
*/
BatchingTemporarySocket.prototype.send = function (msg, offset, length, port, address, callback) {
var work = {
msg: msg,
offset: offset,
length: length,
port: port,
address: address,
callback: callback
};
this.batchSend(work);
};
/**
* Segment emitter module.
* @module SegmentEmitter
*/
var SegmentEmitter = {
daemonConfig: require('./daemon_config'),
/**
* Returns the formatted segment JSON string.
* @param {Segment} segment - The segment to format.
*/
format: function format(segment) {
return PROTOCOL_HEADER + PROTOCOL_DELIMITER + segment.toString();
},
/**
* Creates a UDP socket connection and send the formatted segment.
* @param {Segment} segment - The segment to send to the daemon.
*/
send: function send(segment) {
if (!this.socket) {
if (this.useBatchingTemporarySocket) {
this.socket = new BatchingTemporarySocket();
} else {
this.socket = dgram.createSocket('udp4').unref();
}
}
var client = this.socket;
var formatted = segment.format();
var data = PROTOCOL_HEADER + PROTOCOL_DELIMITER + formatted;
var message = Buffer.from(data);
var short = '{"trace_id:"' + segment.trace_id + '","id":"' + segment.id + '"}';
var type = segment.type === 'subsegment' ? 'Subsegment' : 'Segment';
client.send(message, 0, message.length, this.daemonConfig.udp_port, this.daemonConfig.udp_ip, function(err) {
if (err) {
if (err.code === 'EMSGSIZE') {
logger.getLogger().error(type + ' too large to send: ' + short + ' (' + message.length + ' bytes).');
} else {
logger.getLogger().error('Error occured sending segment: ', err);
}
} else {
logger.getLogger().debug(type + ' sent: {"trace_id:"' + segment.trace_id + '","id":"' + segment.id + '"}');
logger.getLogger().debug('UDP message sent: ' + segment);
}
});
},
/**
* Configures the address and/or port the daemon is expected to be on.
* @param {string} address - Address of the daemon the segments should be sent to. Should be formatted as an IPv4 address.
* @module SegmentEmitter
* @function setDaemonAddress
*/
setDaemonAddress: function setDaemonAddress(address) {
this.daemonConfig.setDaemonAddress(address);
},
/**
* Get the UDP IP the emitter is configured to.
* @module SegmentEmitter
* @function getIp
*/
getIp: function getIp() {
return this.daemonConfig.udp_ip;
},
/**
* Get the UDP port the emitter is configured to.
* @module SegmentEmitter
* @function getPort
*/
getPort: function getPort() {
return this.daemonConfig.udp_port;
},
/**
* Forces the segment emitter to create a new socket on send, and close it on complete.
* @module SegmentEmitter
* @function disableReusableSocket
*/
disableReusableSocket: function() {
this.useBatchingTemporarySocket = true;
}
};
module.exports = SegmentEmitter;