-
Notifications
You must be signed in to change notification settings - Fork 454
/
connection.js
506 lines (424 loc) · 16.5 KB
/
connection.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
var Doc, Query;
if (typeof require !== 'undefined') {
Doc = require('./doc').Doc;
Query = require('./query').Query;
MicroEvent = require('./microevent');
} else {
Doc = exports.Doc;
Query = exports.Query
}
/**
* Handles communication with the sharejs server and provides queries and
* documents.
*
* We create a connection with a socket object
* connection = new sharejs.Connection(sockset)
* The socket may be any object handling the websocket protocol. See the
* documentation of bindToSocket() for details. We then wait for the connection
* to connect
* connection.on('connected', ...)
* and are finally able to work with shared documents
* connection.get('food', 'steak') // Doc
*
* @param socket @see bindToSocket
*/
var Connection = exports.Connection = function (socket) {
// Map of collection -> docName -> doc object for created documents.
// (created documents MUST BE UNIQUE)
this.collections = {};
// Each query is created with an id that the server uses when it sends us
// info about the query (updates, etc).
//this.nextQueryId = (Math.random() * 1000) |0;
this.nextQueryId = 1;
// Map from query ID -> query object.
this.queries = {};
// State of the connection. The correspoding events are emmited when this
// changes. Available states are:
// - 'connecting' The connection has been established, but we don't have our
// client ID yet
// - 'connected' We have connected and recieved our client ID. Ready for data.
// - 'disconnected' The connection is closed, but it will reconnect automatically.
// - 'stopped' The connection is closed, and should not reconnect.
this.state = 'disconnected'
// This is a helper variable the document uses to see whether we're currently
// in a 'live' state. It is true if we're connected, or if you're using
// browserchannel and connecting.
this.canSend = false
// Reset some more state variables.
this.reset();
this.debug = false;
// I'll store the most recent 100 messages so when errors occur we can see
// what happened.
this.messageBuffer = [];
this.bindToSocket(socket);
}
MicroEvent.mixin(Connection);
/**
* Use socket to communicate with server
*
* Socket is an object that can handle the websocket protocol. This method
* installs the onopen, onclose, onmessage and onerror handlers on the socket to
* handle communication and sends messages by calling socket.send(msg). The
* sockets `readyState` property is used to determine the initaial state.
*
* @param socket Handles the websocket protocol
* @param socket.readyState
* @param socket.close
* @param socket.send
* @param socket.onopen
* @param socket.onclose
* @param socket.onmessage
* @param socket.onerror
*/
Connection.prototype.bindToSocket = function(socket) {
if (this.socket) {
delete this.socket.onopen
delete this.socket.onclose
delete this.socket.onmessage
delete this.socket.onerror
}
// TODO: Check that the socket is in the 'connecting' state.
this.socket = socket;
// This logic is replicated in setState - consider calling setState here
// instead.
this.state = (socket.readyState === 0 || socket.readyState === 1) ? 'connecting' : 'disconnected';
this.canSend = this.state === 'connecting' && socket.canSendWhileConnecting;
var connection = this
socket.onmessage = function(msg) {
var data = msg.data;
// Fall back to supporting old browserchannel 1.x API which implemented the
// websocket API incorrectly. This will be removed at some point
if (!data) data = msg;
// Some transports don't need parsing.
if (typeof data === 'string') data = JSON.parse(data);
if (connection.debug) console.log('RECV', JSON.stringify(data));
connection.messageBuffer.push({
t: (new Date()).toTimeString(),
recv:JSON.stringify(data)
});
while (connection.messageBuffer.length > 100) {
connection.messageBuffer.shift();
}
try {
connection.handleMessage(data);
} catch (e) {
connection.emit('error', e);
// We could also restart the connection here, although that might result
// in infinite reconnection bugs.
throw e;
}
}
socket.onopen = function() {
connection._setState('connecting');
};
socket.onerror = function(e) {
// This isn't the same as a regular error, because it will happen normally
// from time to time. Your connection should probably automatically
// reconnect anyway, but that should be triggered off onclose not onerror.
// (onclose happens when onerror gets called anyway).
connection.emit('connection error', e);
};
socket.onclose = function(reason) {
connection._setState('disconnected', reason);
if (reason === 'Closed' || reason === 'Stopped by server') {
connection._setState('stopped', reason);
}
};
};
/**
* @param {object} msg
* @param {String} msg.a action
*/
Connection.prototype.handleMessage = function(msg) {
// Switch on the message action. Most messages are for documents and are
// handled in the doc class.
switch (msg.a) {
case 'init':
// Client initialization packet. This bundle of joy contains our client
// ID.
if (msg.protocol !== 0) throw new Error('Invalid protocol version');
if (typeof msg.id != 'string') throw new Error('Invalid client id');
this.id = msg.id;
this._setState('connected');
break;
case 'qfetch':
case 'qsub':
case 'q':
case 'qunsub':
// Query message. Pass this to the appropriate query object.
var query = this.queries[msg.id];
if (query) query._onMessage(msg);
break;
case 'bs':
// Bulk subscribe response. The responses for each document are contained within.
var result = msg.s;
for (var cName in result) {
for (var docName in result[cName]) {
var doc = this.get(cName, docName);
if (!doc) {
if (console) console.error('Message for unknown doc. Ignoring.', msg);
break;
}
var msg = result[cName][docName];
if (typeof msg === 'object') {
doc._handleSubscribe(msg.error, msg);
} else {
// The msg will be true if we simply resubscribed.
doc._handleSubscribe(null, null);
}
}
}
break;
default:
// Document message. Pull out the referenced document and forward the
// message.
var collection, docName, doc;
if (msg.d) {
collection = this._lastReceivedCollection = msg.c;
docName = this._lastReceivedDoc = msg.d;
} else {
collection = msg.c = this._lastReceivedCollection;
docName = msg.d = this._lastReceivedDoc;
}
this.get(collection, docName)._onMessage(msg);
}
};
Connection.prototype.reset = function() {
this.id = this.lastError =
this._lastReceivedCollection = this._lastReceivedDoc =
this._lastSentCollection = this._lastSentDoc = null;
this.seq = 1;
};
// Set the connection's state. The connection is basically a state machine.
Connection.prototype._setState = function(newState, data) {
if (this.state === newState) return;
// I made a state diagram. The only invalid transitions are getting to
// 'connecting' from anywhere other than 'disconnected' and getting to
// 'connected' from anywhere other than 'connecting'.
if ((newState === 'connecting' && (this.state !== 'disconnected' && this.state !== 'stopped'))
|| (newState === 'connected' && this.state !== 'connecting')) {
throw new Error("Cannot transition directly from " + this.state + " to " + newState);
}
this.state = newState;
this.canSend = (newState === 'connecting' && this.socket.canSendWhileConnecting) || newState === 'connected';
if (newState === 'disconnected') this.reset();
this.emit(newState, data);
// & Emit the event to all documents & queries. It might make sense for
// documents to just register for this stuff using events, but that couples
// connections and documents a bit much. Its not a big deal either way.
this.opQueue = [];
this.bsStart();
for (var c in this.collections) {
var collection = this.collections[c];
for (var docName in collection) {
collection[docName]._onConnectionStateChanged(newState, data);
}
}
// Its important that operations are resent in the same order that they were
// originally sent. If we don't sort, an op with a high sequence number will
// convince the server not to accept any ops with earlier sequence numbers.
this.opQueue.sort(function(a, b) { return a.seq - b.seq; });
for (var i = 0; i < this.opQueue.length; i++) {
this.send(this.opQueue[i]);
}
this.opQueue = null;
this.bsEnd();
// Its important that query resubscribes are sent after documents to make sure
// the server knows all the documents we're subscribed to when it issues the
// queries internally.
// No bulk subscribe for queries yet.
for (var id in this.queries) {
this.queries[id]._onConnectionStateChanged(newState, data);
}
};
// So, there's an awful error case where the client sends two requests (which
// fail), then reconnects. The documents could have _onConnectionStateChanged
// called in the wrong order and the operations then get sent with reversed
// sequence numbers. This causes the server to incorrectly reject the second
// sent op. So we need to queue the operations while we're reconnecting and
// resend them in the correct order.
Connection.prototype.sendOp = function(data) {
if (this.opQueue) {
this.opQueue.push(data);
} else {
this.send(data);
}
};
Connection.prototype.bsStart = function() {
this.subscribeData = {};
};
Connection.prototype.bsEnd = function() {
// Only send bulk subscribe if not empty. Its weird using a for loop for
// this, but it works pretty well.
for (var __unused in this.subscribeData) {
this.send({a:'bs', s:this.subscribeData});
break;
}
this.subscribeData = null;
};
// This is called by the document class when the document wants to subscribe.
// We could just send a subscribe message, but during reconnect that causes a
// bajillion messages over browserchannel. During reconnect we'll aggregate,
// similar to sendOp.
Connection.prototype.sendSubscribe = function(collection, name, v) {
if (this.subscribeData) {
var data = this.subscribeData;
if (!data[collection]) data[collection] = {};
data[collection][name] = v || null;
} else {
var msg = {a:'sub', c:collection, d:name};
if (v != null) msg.v = v;
this.send(msg);
}
};
/**
* Sends a message down the socket
*/
Connection.prototype.send = function(msg) {
if (this.debug) console.log("SEND", JSON.stringify(msg));
this.messageBuffer.push({t:Date.now(), send:JSON.stringify(msg)});
while (this.messageBuffer.length > 100) {
this.messageBuffer.shift();
}
if (msg.d) { // The document the message refers to. Not set for queries.
var collection = msg.c;
var docName = msg.d;
if (collection === this._lastSentCollection && docName === this._lastSentDoc) {
delete msg.c;
delete msg.d;
} else {
this._lastSentCollection = collection;
this._lastSentDoc = docName;
}
}
if (!this.socket.canSendJSON)
msg = JSON.stringify(msg);
this.socket.send(msg);
};
/**
* Closes the socket and emits 'disconnected'
*/
Connection.prototype.disconnect = function() {
this.socket.close();
};
/**
* @deprecated
*/
Connection.prototype.getExisting = function(collection, name) {
console.trace('getExisting is deprecated. Use get() instead');
if (this.collections[collection]) return this.collections[collection][name];
};
/**
* @deprecated
*/
Connection.prototype.getOrCreate = function(collection, name, data) {
console.trace('getOrCreate is deprecated. Use get() instead');
return this.get(collection, name, data);
};
/**
* Get or create a document.
*
* @param collection
* @param name
* @param [data] ingested into document if created
* @return {Doc}
*/
Connection.prototype.get = function(collection, name, data) {
var collectionObject = this.collections[collection];
if (!collectionObject)
collectionObject = this.collections[collection] = {};
var doc = collectionObject[name];
if (!doc)
doc = collectionObject[name] = new Doc(this, collection, name);
// Even if the document isn't new, its possible the document was created
// manually and then tried to be re-created with data (suppose a query
// returns with data for the document). We should hydrate the document
// immediately if we can because the query callback will expect the document
// to have data.
if (data && data.data !== undefined && !doc.state)
doc.ingestData(data);
return doc;
};
/**
* Remove document from this.collections
*
* @private
*/
Connection.prototype._destroyDoc = function(doc) {
var collectionObject = this.collections[doc.collection];
if (!collectionObject) return;
delete collectionObject[doc.name];
// Delete the collection container if its empty. This could be a source of
// memory leaks if you slowly make a billion collections, which you probably
// won't do anyway, but whatever.
if (!hasKeys(collectionObject))
delete this.collections[doc.collection];
};
function hasKeys(object) {
for (var key in object) return true;
return false;
};
// Helper for createFetchQuery and createSubscribeQuery, below.
Connection.prototype._createQuery = function(type, collection, q, options, callback) {
if (type !== 'fetch' && type !== 'sub')
throw new Error('Invalid query type: ' + type);
if (!options) options = {};
var id = this.nextQueryId++;
var query = new Query(type, this, id, collection, q, options, callback);
this.queries[id] = query;
query._execute();
return query;
};
// Internal function. Use query.destroy() to remove queries.
Connection.prototype._destroyQuery = function(query) {
delete this.queries[query.id];
};
// The query options object can contain the following fields:
//
// docMode: What to do with documents that are in the result set. Can be
// null/undefined (default), 'fetch' or 'subscribe'. Fetch mode indicates
// that the server should send document snapshots to the client for all query
// results. These will be hydrated into the document objects before the query
// result callbacks are returned. Subscribe mode gets document snapshots and
// automatically subscribes the client to all results. Note that the
// documents *WILL NOT* be automatically unsubscribed when the query is
// destroyed. (ShareJS doesn't have enough information to do that safely).
// Beware of memory leaks when using this option.
//
// poll: Forcably enable or disable polling mode. Polling mode will reissue the query
// every time anything in the collection changes (!!) so, its quite
// expensive. It is automatically enabled for paginated and sorted queries.
// By default queries run with polling mode disabled; which will only check
// changed documents to test if they now match the specified query.
// Set to false to disable polling mode, or true to enable it. If you don't
// specify a poll option, polling mode is enabled or disabled automatically
// by the query's backend.
//
// backend: Set the backend source for the query. You can attach different
// query backends to livedb and pick which one the query should hit using
// this parameter.
//
// results: (experimental) Initial list of resultant documents. This is
// useful for rehydrating queries when you're using autoFetch / autoSubscribe
// so the server doesn't have to send over snapshots for documents the client
// already knows about. This is experimental - the API may change in upcoming
// versions.
// Create a fetch query. Fetch queries are only issued once, returning the
// results directly into the callback.
//
// The index is specific to the source, but if you're using mongodb it'll be
// the collection to which the query is made.
// The callback should have the signature function(error, results, extraData)
// where results is a list of Doc objects.
Connection.prototype.createFetchQuery = function(index, q, options, callback) {
return this._createQuery('fetch', index, q, options, callback);
};
// Create a subscribe query. Subscribe queries return with the initial data
// through the callback, then update themselves whenever the query result set
// changes via their own event emitter.
//
// If present, the callback should have the signature function(error, results, extraData)
// where results is a list of Doc objects.
Connection.prototype.createSubscribeQuery = function(index, q, options, callback) {
return this._createQuery('sub', index, q, options, callback);
};