From 8deec9b278b7893b53e6c2c3db3f20d46f17393d Mon Sep 17 00:00:00 2001 From: Matt Broadstone Date: Fri, 1 Jun 2018 09:54:17 -0400 Subject: [PATCH] feat(op-compressed): add support for OP_COMPRESSED to new sdam impl --- lib/sdam/cursor.js | 196 +++++++++++++++++++++---------------------- lib/sdam/server.js | 26 ++++-- lib/sdam/topology.js | 7 +- 3 files changed, 114 insertions(+), 115 deletions(-) diff --git a/lib/sdam/cursor.js b/lib/sdam/cursor.js index 8467b22bd..851603729 100644 --- a/lib/sdam/cursor.js +++ b/lib/sdam/cursor.js @@ -241,7 +241,7 @@ class Cursor { } // Default pool - const pool = this.server.s.pool; + const pool = this.s.server.s.pool; // Execute command this.s.server.s.wireProtocolHandler.killCursor(this.bson, this.ns, this.s, pool, callback); @@ -540,6 +540,44 @@ function _setCursorNotifiedImpl(cursor, callback) { return callback(); } +function initializeCursorAndRetryNext(cursor, callback) { + cursor.topology.selectServer( + readPreferenceServerSelector(cursor.options.readPreference || ReadPreference.primary), + (err, server) => { + if (err) { + callback(err, null); + return; + } + + cursor.s.server = server; + cursor.s.init = true; + + // check if server supports collation + // NOTE: this should be a part of the selection predicate! + if (cursor.cmd && cursor.cmd.collation && cursor.server.description.maxWireVersion < 5) { + callback(new MongoError(`server ${cursor.server.name} does not support collation`)); + return; + } + + try { + cursor.query = cursor.s.server.s.wireProtocolHandler.command( + cursor.bson, + cursor.ns, + cursor.cmd, + cursor.s, + cursor.topology, + cursor.options + ); + + nextFunction(cursor, callback); + } catch (err) { + callback(err); + return; + } + } + ); +} + function nextFunction(cursor, callback) { // We have notified about it if (cursor.s.notified) { @@ -557,40 +595,7 @@ function nextFunction(cursor, callback) { // We have just started the cursor if (!cursor.s.init) { - cursor.topology.selectServer( - readPreferenceServerSelector(cursor.options.readPreference || ReadPreference.primary), - (err, server) => { - if (err) { - callback(err, null); - return; - } - - cursor.s.server = server; - cursor.s.init = true; - - // Server does not support server - // NOTE: this should be a part of the selection predicate! - if (cursor.cmd && cursor.cmd.collation && cursor.server.description.maxWireVersion < 5) { - callback(new MongoError(`server ${cursor.server.name} does not support collation`)); - return; - } - - try { - cursor.query = cursor.s.server.s.wireProtocolHandler.command( - cursor.bson, - cursor.ns, - cursor.cmd, - cursor.s, - cursor.topology, - cursor.options - ); - } catch (err) { - console.dir(err); - callback(err); - return; - } - } - ); + return initializeCursorAndRetryNext(cursor, callback); } // If we don't have a cursorId execute the first query @@ -599,15 +604,8 @@ function nextFunction(cursor, callback) { // execute the query against the db if (isConnectionDead(cursor, callback)) return; - // Check if topology is destroyed - // if (cursor.topology.isDestroyed()) { - // return callback( - // new MongoNetworkError('connection destroyed, not possible to instantiate cursor') - // ); - // } - // query, cmd, options, s, callback - _find(cursor, function(err) { + return _find(cursor, function(err) { if (err) return handleCallback(callback, err, null); if (cursor.s.cursorId && cursor.s.cursorId.isZero() && cursor._endSession) { @@ -626,32 +624,46 @@ function nextFunction(cursor, callback) { nextFunction(cursor, callback); }); - } else if (cursor.s.limit > 0 && cursor.s.currentLimit >= cursor.s.limit) { + } + + if (cursor.s.documents.length === cursor.s.cursorIndex && Long.ZERO.equals(cursor.s.cursorId)) { + setCursorDeadAndNotified(cursor, callback); + return; + } + + if (cursor.s.limit > 0 && cursor.s.currentLimit >= cursor.s.limit) { // Ensure we kill the cursor on the server cursor.kill(); // Set cursor in dead and notified state - return setCursorDeadAndNotified(cursor, callback); - } else if ( - cursor.s.cursorIndex === cursor.s.documents.length && - !Long.ZERO.equals(cursor.s.cursorId) + setCursorDeadAndNotified(cursor, callback); + return; + } + + if ( + cursor.s.documents.length === cursor.s.cursorIndex && + cursor.cmd.tailable && + Long.ZERO.equals(cursor.s.cursorId) ) { + return handleCallback( + callback, + new MongoError({ + message: 'No more documents in tailed cursor', + tailable: cursor.cmd.tailable, + awaitData: cursor.cmd.awaitData + }) + ); + } + + if (cursor.s.cursorIndex === cursor.s.documents.length && !Long.ZERO.equals(cursor.s.cursorId)) { // Ensure an empty cursor state cursor.s.documents = []; cursor.s.cursorIndex = 0; - // Check if topology is destroyed - // if (cursor.topology.isDestroyed()) { - // return callback( - // new MongoNetworkError('connection destroyed, not possible to instantiate cursor') - // ); - // } - // Check if connection is dead and return if not possible to - // execute a getmore on this connection if (isConnectionDead(cursor, callback)) return; // Execute the next get more - _getmore(cursor, function(err, doc, connection) { + return _getmore(cursor, function(err, doc, connection) { if (err) { if (err instanceof MongoError) { err[mongoErrorContextSymbol].isGetMore = true; @@ -698,56 +710,38 @@ function nextFunction(cursor, callback) { nextFunction(cursor, callback); }); - } else if ( - cursor.s.documents.length === cursor.s.cursorIndex && - cursor.cmd.tailable && - Long.ZERO.equals(cursor.s.cursorId) - ) { - return handleCallback( - callback, - new MongoError({ - message: 'No more documents in tailed cursor', - tailable: cursor.cmd.tailable, - awaitData: cursor.cmd.awaitData - }) - ); - } else if ( - cursor.s.documents.length === cursor.s.cursorIndex && - Long.ZERO.equals(cursor.s.cursorId) - ) { - setCursorDeadAndNotified(cursor, callback); - } else { - if (cursor.s.limit > 0 && cursor.s.currentLimit >= cursor.s.limit) { - // Ensure we kill the cursor on the server - cursor.kill(); - // Set cursor in dead and notified state - return setCursorDeadAndNotified(cursor, callback); - } + } - // Increment the current cursor limit - cursor.s.currentLimit += 1; + if (cursor.s.limit > 0 && cursor.s.currentLimit >= cursor.s.limit) { + // Ensure we kill the cursor on the server + cursor.kill(); + // Set cursor in dead and notified state + return setCursorDeadAndNotified(cursor, callback); + } - // Get the document - var doc = cursor.s.documents[cursor.s.cursorIndex++]; + // Increment the current cursor limit + cursor.s.currentLimit += 1; - // Doc overflow - if (!doc || doc.$err) { - // Ensure we kill the cursor on the server - cursor.kill(); - // Set cursor in dead and notified state - return setCursorDeadAndNotified(cursor, function() { - handleCallback(callback, new MongoError(doc ? doc.$err : undefined)); - }); - } + // Get the document + let doc = cursor.s.documents[cursor.s.cursorIndex++]; - // Transform the doc with passed in transformation method if provided - if (cursor.s.transforms && typeof cursor.s.transforms.doc === 'function') { - doc = cursor.s.transforms.doc(doc); - } + // Doc overflow + if (!doc || doc.$err) { + // Ensure we kill the cursor on the server + cursor.kill(); + // Set cursor in dead and notified state + return setCursorDeadAndNotified(cursor, function() { + handleCallback(callback, new MongoError(doc ? doc.$err : undefined)); + }); + } - // Return the document - handleCallback(callback, null, doc); + // Transform the doc with passed in transformation method if provided + if (cursor.s.transforms && typeof cursor.s.transforms.doc === 'function') { + doc = cursor.s.transforms.doc(doc); } + + // Return the document + handleCallback(callback, null, doc); } module.exports = Cursor; diff --git a/lib/sdam/server.js b/lib/sdam/server.js index f7720ffba..b3f1b6a82 100644 --- a/lib/sdam/server.js +++ b/lib/sdam/server.js @@ -56,7 +56,7 @@ class Server extends EventEmitter { BSON.Timestamp ]), // client metadata for the initial handshake - clientInfo: createClientInfo(options) + clientInfo: createClientInfo(options), }; } @@ -86,8 +86,8 @@ class Server extends EventEmitter { // Set up listeners this.s.pool.on('connect', connectEventHandler(this)); + this.s.pool.on('close', closeEventHandler(this)); - // this.s.pool.on('close', closeEventHandler(this)); // this.s.pool.on('error', errorEventHandler(this)); // this.s.pool.on('timeout', timeoutEventHandler(this)); // this.s.pool.on('parseError', errorEventHandler(this)); @@ -345,8 +345,8 @@ function extractIsMasterError(err, result) { function executeServerHandshake(server, callback) { // construct an `ismaster` query const compressors = - server.s.compression && server.s.compression.compressors - ? server.s.compression.compressors + server.s.options.compression && server.s.options.compression.compressors + ? server.s.options.compression.compressors : []; const queryOptions = { numberToSkip: 0, numberToReturn: -1, checkKeys: false, slaveOk: true }; @@ -403,15 +403,17 @@ function connectEventHandler(server) { // compression negotation if (isMaster && isMaster.compression) { - for (var i = 0; i < server.s.compression.compressors.length; i++) { - if (isMaster.compression.indexOf(server.s.compression.compressors[i]) > -1) { - server.s.pool.options.agreedCompressor = server.s.compression.compressors[i]; + const localCompressionInfo = server.s.options.compression; + const localCompressors = localCompressionInfo.compressors; + for (var i = 0; i < localCompressors.length; i++) { + if (isMaster.compression.indexOf(localCompressors[i]) > -1) { + server.s.pool.options.agreedCompressor = localCompressors[i]; break; } } - if (server.s.compression.zlibCompressionLevel) { - server.s.pool.options.zlibCompressionLevel = server.s.compression.zlibCompressionLevel; + if (localCompressionInfo.zlibCompressionLevel) { + server.s.pool.options.zlibCompressionLevel = localCompressionInfo.zlibCompressionLevel; } } @@ -437,4 +439,10 @@ function connectEventHandler(server) { }; } +function closeEventHandler(server) { + return function() { + server.emit('close'); + }; +} + module.exports = Server; diff --git a/lib/sdam/topology.js b/lib/sdam/topology.js index 0ce77a0fd..d3634c6be 100644 --- a/lib/sdam/topology.js +++ b/lib/sdam/topology.js @@ -499,7 +499,7 @@ function updateServers(topology, currentServerDescription) { function serverConnectEventHandler(server, topology) { return function(/* ismaster */) { - topology.emit('connect', server); + topology.emit('connect', topology); }; } @@ -623,7 +623,4 @@ function executeWriteOperation(args, options, callback) { * @type {ServerHeartbeatSucceededEvent} */ -module.exports = { - Topology, - ServerDescription -}; +module.exports = Topology;