Skip to content
This repository has been archived by the owner on Feb 4, 2022. It is now read-only.

Commit

Permalink
feat(op-compressed): add support for OP_COMPRESSED to new sdam impl
Browse files Browse the repository at this point in the history
  • Loading branch information
mbroadst committed Jun 20, 2018
1 parent f289226 commit 8deec9b
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 115 deletions.
196 changes: 95 additions & 101 deletions lib/sdam/cursor.js
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ class Cursor {
}

// Default pool
const pool = this.server.s.pool;
const pool = this.s.server.s.pool;

// Execute command
this.s.server.s.wireProtocolHandler.killCursor(this.bson, this.ns, this.s, pool, callback);
Expand Down Expand Up @@ -540,6 +540,44 @@ function _setCursorNotifiedImpl(cursor, callback) {
return callback();
}

function initializeCursorAndRetryNext(cursor, callback) {
cursor.topology.selectServer(
readPreferenceServerSelector(cursor.options.readPreference || ReadPreference.primary),
(err, server) => {
if (err) {
callback(err, null);
return;
}

cursor.s.server = server;
cursor.s.init = true;

// check if server supports collation
// NOTE: this should be a part of the selection predicate!
if (cursor.cmd && cursor.cmd.collation && cursor.server.description.maxWireVersion < 5) {
callback(new MongoError(`server ${cursor.server.name} does not support collation`));
return;
}

try {
cursor.query = cursor.s.server.s.wireProtocolHandler.command(
cursor.bson,
cursor.ns,
cursor.cmd,
cursor.s,
cursor.topology,
cursor.options
);

nextFunction(cursor, callback);
} catch (err) {
callback(err);
return;
}
}
);
}

function nextFunction(cursor, callback) {
// We have notified about it
if (cursor.s.notified) {
Expand All @@ -557,40 +595,7 @@ function nextFunction(cursor, callback) {

// We have just started the cursor
if (!cursor.s.init) {
cursor.topology.selectServer(
readPreferenceServerSelector(cursor.options.readPreference || ReadPreference.primary),
(err, server) => {
if (err) {
callback(err, null);
return;
}

cursor.s.server = server;
cursor.s.init = true;

// Server does not support server
// NOTE: this should be a part of the selection predicate!
if (cursor.cmd && cursor.cmd.collation && cursor.server.description.maxWireVersion < 5) {
callback(new MongoError(`server ${cursor.server.name} does not support collation`));
return;
}

try {
cursor.query = cursor.s.server.s.wireProtocolHandler.command(
cursor.bson,
cursor.ns,
cursor.cmd,
cursor.s,
cursor.topology,
cursor.options
);
} catch (err) {
console.dir(err);
callback(err);
return;
}
}
);
return initializeCursorAndRetryNext(cursor, callback);
}

// If we don't have a cursorId execute the first query
Expand All @@ -599,15 +604,8 @@ function nextFunction(cursor, callback) {
// execute the query against the db
if (isConnectionDead(cursor, callback)) return;

// Check if topology is destroyed
// if (cursor.topology.isDestroyed()) {
// return callback(
// new MongoNetworkError('connection destroyed, not possible to instantiate cursor')
// );
// }

// query, cmd, options, s, callback
_find(cursor, function(err) {
return _find(cursor, function(err) {
if (err) return handleCallback(callback, err, null);

if (cursor.s.cursorId && cursor.s.cursorId.isZero() && cursor._endSession) {
Expand All @@ -626,32 +624,46 @@ function nextFunction(cursor, callback) {

nextFunction(cursor, callback);
});
} else if (cursor.s.limit > 0 && cursor.s.currentLimit >= cursor.s.limit) {
}

if (cursor.s.documents.length === cursor.s.cursorIndex && Long.ZERO.equals(cursor.s.cursorId)) {
setCursorDeadAndNotified(cursor, callback);
return;
}

if (cursor.s.limit > 0 && cursor.s.currentLimit >= cursor.s.limit) {
// Ensure we kill the cursor on the server
cursor.kill();
// Set cursor in dead and notified state
return setCursorDeadAndNotified(cursor, callback);
} else if (
cursor.s.cursorIndex === cursor.s.documents.length &&
!Long.ZERO.equals(cursor.s.cursorId)
setCursorDeadAndNotified(cursor, callback);
return;
}

if (
cursor.s.documents.length === cursor.s.cursorIndex &&
cursor.cmd.tailable &&
Long.ZERO.equals(cursor.s.cursorId)
) {
return handleCallback(
callback,
new MongoError({
message: 'No more documents in tailed cursor',
tailable: cursor.cmd.tailable,
awaitData: cursor.cmd.awaitData
})
);
}

if (cursor.s.cursorIndex === cursor.s.documents.length && !Long.ZERO.equals(cursor.s.cursorId)) {
// Ensure an empty cursor state
cursor.s.documents = [];
cursor.s.cursorIndex = 0;

// Check if topology is destroyed
// if (cursor.topology.isDestroyed()) {
// return callback(
// new MongoNetworkError('connection destroyed, not possible to instantiate cursor')
// );
// }

// Check if connection is dead and return if not possible to
// execute a getmore on this connection
if (isConnectionDead(cursor, callback)) return;

// Execute the next get more
_getmore(cursor, function(err, doc, connection) {
return _getmore(cursor, function(err, doc, connection) {
if (err) {
if (err instanceof MongoError) {
err[mongoErrorContextSymbol].isGetMore = true;
Expand Down Expand Up @@ -698,56 +710,38 @@ function nextFunction(cursor, callback) {

nextFunction(cursor, callback);
});
} else if (
cursor.s.documents.length === cursor.s.cursorIndex &&
cursor.cmd.tailable &&
Long.ZERO.equals(cursor.s.cursorId)
) {
return handleCallback(
callback,
new MongoError({
message: 'No more documents in tailed cursor',
tailable: cursor.cmd.tailable,
awaitData: cursor.cmd.awaitData
})
);
} else if (
cursor.s.documents.length === cursor.s.cursorIndex &&
Long.ZERO.equals(cursor.s.cursorId)
) {
setCursorDeadAndNotified(cursor, callback);
} else {
if (cursor.s.limit > 0 && cursor.s.currentLimit >= cursor.s.limit) {
// Ensure we kill the cursor on the server
cursor.kill();
// Set cursor in dead and notified state
return setCursorDeadAndNotified(cursor, callback);
}
}

// Increment the current cursor limit
cursor.s.currentLimit += 1;
if (cursor.s.limit > 0 && cursor.s.currentLimit >= cursor.s.limit) {
// Ensure we kill the cursor on the server
cursor.kill();
// Set cursor in dead and notified state
return setCursorDeadAndNotified(cursor, callback);
}

// Get the document
var doc = cursor.s.documents[cursor.s.cursorIndex++];
// Increment the current cursor limit
cursor.s.currentLimit += 1;

// Doc overflow
if (!doc || doc.$err) {
// Ensure we kill the cursor on the server
cursor.kill();
// Set cursor in dead and notified state
return setCursorDeadAndNotified(cursor, function() {
handleCallback(callback, new MongoError(doc ? doc.$err : undefined));
});
}
// Get the document
let doc = cursor.s.documents[cursor.s.cursorIndex++];

// Transform the doc with passed in transformation method if provided
if (cursor.s.transforms && typeof cursor.s.transforms.doc === 'function') {
doc = cursor.s.transforms.doc(doc);
}
// Doc overflow
if (!doc || doc.$err) {
// Ensure we kill the cursor on the server
cursor.kill();
// Set cursor in dead and notified state
return setCursorDeadAndNotified(cursor, function() {
handleCallback(callback, new MongoError(doc ? doc.$err : undefined));
});
}

// Return the document
handleCallback(callback, null, doc);
// Transform the doc with passed in transformation method if provided
if (cursor.s.transforms && typeof cursor.s.transforms.doc === 'function') {
doc = cursor.s.transforms.doc(doc);
}

// Return the document
handleCallback(callback, null, doc);
}

module.exports = Cursor;
26 changes: 17 additions & 9 deletions lib/sdam/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class Server extends EventEmitter {
BSON.Timestamp
]),
// client metadata for the initial handshake
clientInfo: createClientInfo(options)
clientInfo: createClientInfo(options),
};
}

Expand Down Expand Up @@ -86,8 +86,8 @@ class Server extends EventEmitter {

// Set up listeners
this.s.pool.on('connect', connectEventHandler(this));
this.s.pool.on('close', closeEventHandler(this));

// this.s.pool.on('close', closeEventHandler(this));
// this.s.pool.on('error', errorEventHandler(this));
// this.s.pool.on('timeout', timeoutEventHandler(this));
// this.s.pool.on('parseError', errorEventHandler(this));
Expand Down Expand Up @@ -345,8 +345,8 @@ function extractIsMasterError(err, result) {
function executeServerHandshake(server, callback) {
// construct an `ismaster` query
const compressors =
server.s.compression && server.s.compression.compressors
? server.s.compression.compressors
server.s.options.compression && server.s.options.compression.compressors
? server.s.options.compression.compressors
: [];

const queryOptions = { numberToSkip: 0, numberToReturn: -1, checkKeys: false, slaveOk: true };
Expand Down Expand Up @@ -403,15 +403,17 @@ function connectEventHandler(server) {

// compression negotation
if (isMaster && isMaster.compression) {
for (var i = 0; i < server.s.compression.compressors.length; i++) {
if (isMaster.compression.indexOf(server.s.compression.compressors[i]) > -1) {
server.s.pool.options.agreedCompressor = server.s.compression.compressors[i];
const localCompressionInfo = server.s.options.compression;
const localCompressors = localCompressionInfo.compressors;
for (var i = 0; i < localCompressors.length; i++) {
if (isMaster.compression.indexOf(localCompressors[i]) > -1) {
server.s.pool.options.agreedCompressor = localCompressors[i];
break;
}
}

if (server.s.compression.zlibCompressionLevel) {
server.s.pool.options.zlibCompressionLevel = server.s.compression.zlibCompressionLevel;
if (localCompressionInfo.zlibCompressionLevel) {
server.s.pool.options.zlibCompressionLevel = localCompressionInfo.zlibCompressionLevel;
}
}

Expand All @@ -437,4 +439,10 @@ function connectEventHandler(server) {
};
}

function closeEventHandler(server) {
return function() {
server.emit('close');
};
}

module.exports = Server;
7 changes: 2 additions & 5 deletions lib/sdam/topology.js
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ function updateServers(topology, currentServerDescription) {

function serverConnectEventHandler(server, topology) {
return function(/* ismaster */) {
topology.emit('connect', server);
topology.emit('connect', topology);
};
}

Expand Down Expand Up @@ -623,7 +623,4 @@ function executeWriteOperation(args, options, callback) {
* @type {ServerHeartbeatSucceededEvent}
*/

module.exports = {
Topology,
ServerDescription
};
module.exports = Topology;

0 comments on commit 8deec9b

Please sign in to comment.