Skip to content

Commit

Permalink
Massive insert performance optimizations
Browse files Browse the repository at this point in the history
Still need to do some more optimizations for onRetrieved to further
reduce overhead, but it's back to acceptable levels.
  • Loading branch information
notheotherben committed Oct 9, 2014
1 parent 1e699fc commit 9cf9041
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 121 deletions.
161 changes: 62 additions & 99 deletions lib/Model.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
var _ = require('lodash'),
async = require('async-q'),
async = require('async'),
debug = require('debug')('iridium:Model'),
ObjectID = require('mongodb').ObjectID,
EventEmitter = require('events').EventEmitter,
Expand Down Expand Up @@ -228,8 +228,6 @@ Model.prototype.onRetrieved = function(conditions, results, wrapper, options) {

var $ = this;

var deferred = Q.defer();

wrapper = wrapper || this.wrap.bind(this);
options = options || {};

Expand All @@ -242,106 +240,83 @@ Model.prototype.onRetrieved = function(conditions, results, wrapper, options) {
var returnArray = Array.isArray(results);
if(!returnArray) results = [results];

function doHook(hook, target) {
if(!hook) return Q();
function doHook(hook, target, next) {
if(!hook) return next();

if(hook.length === 0) {
try {
hook.call(target);
return Q();
return next();
} catch(err) {
return Q.reject(err);
return next(err);
}
} else {
var q = Q.defer();
hook.call(target, function(err) {
if(err) return q.reject(err);
return q.resolve();
});
return q.promise;
hook.call(target, next);
}
}

var deferred = Q.defer();

async.parallel(_.map(results, function(target) {
return (function() {
return doHook(this.options.hooks.retrieved, target).then((function() {
return (function(done) {
doHook(this.options.hooks.retrieved, target, (function(err) {
if(err) return done(err);

this.emit('retrieved', target);
return Q();
}).bind(this)).then((function() {
if(!options.cache || options.partial) return Q();

var q = Q.defer();
var cacheDoc = _.cloneDeep(target);

this.cache.store(conditions, cacheDoc, function() {
q.resolve();
});

return q.promise;
}).bind(this)).then((function() {

if(this.cache && options.cache && !options.partial) {
var cacheDoc = _.cloneDeep(target);
this.cache.store(conditions, cacheDoc, function() { });
}

var wrapped = target;

if(options.wrap) wrapped = wrapper(target, false, options.partial);
else this.fromSource(wrapped);

return Q(wrapped);
}).bind(this)).then((function(wrapped) {
return doHook(this.options.hooks.ready, wrapped).then(function() { return Q(wrapped); });
doHook(this.options.hooks.ready, wrapped, (function(err) {
if(err) return done(err);

return done(null, wrapped);
}).bind(this));
}).bind(this));
}).bind(this);
}, this)).then(function(output) {
if(returnArray)
return deferred.resolve(output);
else
return deferred.resolve(output[0]);
}, (function(err) {
this.emit('error', err);
return deferred.reject(err);
}).bind(this));
}, this), function(err, results) {
if(err) return deferred.reject(err);
if(returnArray) return deferred.resolve(results);
return deferred.resolve(results[0]);
});

return deferred.promise;
return deferred.promise;
};


Model.prototype.onCreating = function(document) {
Model.prototype.onCreating = function(documents) {
///<signature>
///<summary>Handles any pre-creation hooks for the model</summary>
///<param name="document" type="Object">The document being created - prior to any transformations being applied</param>
///<param name="document" type="Array" elementType="Object">The documents being created - prior to any transformations being applied</param>
///</signature>

function doHook(hook, target, args) {
if(!hook) return Q();
if(hook.length === 0) {
try {
hook.apply(target, args);
return Q();
} catch(err) {
return Q.reject(err);
}
} else {
var q = Q.defer();
args.push(function(err) {
if(err) return q.reject(err);
return q.resolve();
});

hook.apply(target, args);
return q.promise;
}
function doHook(hook, target) {
if(!hook) return;
if(hook.length > 0) throw new Error('creating hook does not support async for performance reasons');
hook.call(target);
}

var sent = false;
return doHook(this.options.hooks.creating || this.options.hooks.beforeCreate, document, []).then((function() {
if(!sent) {
sent = true;
for(var i = 0; i < documents.length; i++) {
var document = documents[i];
try {
this.emit('creating', document);
doHook(this.options.hooks.creating || this.options.hooks.beforeCreate, document);
var validation = this.schemaValidator.validate(document);
if(validation.failed) return Q.reject(validation.error);
this.toSource(document);
} catch(err) {
return Q.reject(err);
}
return Q(document);
}).bind(this), (function(err) {
this.emit('error', err);
return Q.reject(err);
}).bind(this));

}

return Q(documents);
};


Expand Down Expand Up @@ -521,7 +496,7 @@ Model.prototype.findOne = Model.prototype.get = fn.first(function() {
var deferred = Q.defer();

if(this.options.cache && this.context.cache && this.context.cache.valid(this.conditions))
return Q.nbind(this.context.cache.fetch, this.context.cache)(this.conditions).then(function(doc) { if(doc) return Q(doc); return Q.reject(null); });
return Q.ninvoke(this.context.cache, "fetch", this.conditions).then(function(doc) { if(doc) return Q(doc); return Q.reject(null); });
else return Q.reject();

return deferred.promise;
Expand All @@ -530,9 +505,7 @@ Model.prototype.findOne = Model.prototype.get = fn.first(function() {
.then((function(doc) {
return this.context.onRetrieved(this.conditions, doc, null, { wrap: this.options.wrap, cache: this.options.cache, partial: !!this.options.fields });
}).bind(this), (function(err) {
var findOne = Q.nbind(this.context.collection.findOne, this.context.collection);

return findOne(this.conditions, { sort: this.options.sort, skip: this.options.skip, fields: this.options.fields }).then((function(doc) {
return Q.ninvoke(this.context.collection, "findOne", this.conditions, { sort: this.options.sort, skip: this.options.skip, fields: this.options.fields }).then((function(doc) {
if(!doc) return Q(doc);
return this.context.onRetrieved(this.conditions, doc, null, { wrap: this.options.wrap, cache: this.options.cache, partial: !!this.options.fields });
}).bind(this));
Expand All @@ -546,8 +519,8 @@ Model.prototype.insert = Model.prototype.create = fn.first(function() {
this.deferred = Q.defer();
})
.on(Object, fn.gobble(), function(object) {
this.args[0] = [object];
this.returnArray = false;
this.args[0] = [object];
this.retry();
})
.on([Object], fn.opt(Function), function(objects, callback) {
Expand All @@ -573,28 +546,18 @@ Model.prototype.insert = Model.prototype.create = fn.first(function() {
if(callback) promiseCallback(this.deferred.promise, callback);
}).then(function() {
var queryOptions = { w: this.options.w, upsert: !!this.options.upsert, new: true };

promisePipe(async.series(_.map(this.objects, function(object) {
return (function() {
return this.context.onCreating(object).then((function(object) {
var validation = this.context.schemaValidator.validate(object);
if(validation.failed) return Q.reject(validation.error);
return Q(object);
}).bind(this)).then((function(object) {
this.context.toSource(object);
return Q(object);
}).bind(this));
}).bind(this);
}, this)).then((function(objects) {
if(queryOptions.upsert) return async.parallel(_.map(objects, function(object) {
return (function() {
return Q.nbind(this.context.collection.findAndModify, this.context.collection)({ _id: object._id }, { _id: 1 }, object, queryOptions).then(function(result) {

promisePipe(Q().then((function() {
return this.context.onCreating(this.objects);
}).bind(this)).then((function(objects) {
if(queryOptions.upsert) return Q.ninvoke(async, "parallel", _.map(objects, function(object) {
return (function(done) {
Q.ninvoke(this.context.collection, "findAndModify", { _id: object._id }, { _id: 1 }, object, queryOptions).then(function(result) {
return Q(result[0]);
});
}).nodeify(done);
}).bind(this);
}, this))
;
else return Q.nbind(this.context.collection.insert, this.context.collection)(objects, queryOptions);
}, this));
else return Q.ninvoke(this.context.collection, "insert", objects, queryOptions);
}).bind(this)).then((function(inserted) {
return this.context.onRetrieved(null, inserted, null, { wrap: this.options.wrap, cache: this.options.cache });
}).bind(this)).then((function(results) {
Expand Down Expand Up @@ -650,7 +613,7 @@ Model.prototype.count = fn.first(function() {
this.deferred = Q.defer();
this.count = Q.nbind(this.context.collection.count, this.context.collection);
}).on(fn.not(Object), fn.gobble(), function(conditions, callback) {
this.args[0] = this.context.downstreamID(conditions);
this.args[0] = this.context.downstreamID(id);
this.retry();
}).on(fn.opt(Function), function(callback) {
this.conditions = {};
Expand All @@ -675,7 +638,7 @@ Model.prototype.remove = fn.first(function() {
this.cacheDrop = Q.nbind(this.context.cache.drop, this.context.cache);
this.remove = Q.nbind(this.context.collection.remove, this.context.collection);
}).on(fn.not(Object), fn.gobble(), function(conditions, callback) {
this.args[0] = this.context.downstreamID(conditions);
this.args[0] = this.context.downstreamID(id);
this.retry();
}).on(fn.opt(Function), function(callback) {
this.conditions = {};
Expand Down
5 changes: 2 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@
"mongodb": "~1.4",
"skmatc": "1.x",
"lodash": "*",
"async": "~0.9",
"concoction": "~1.0",
"debug": "1.x",
"q": "1.x",
"async-q": "~0.2",
"functionality": "1.x"
"async": "~0.9",
"functionality": "2.x"
},
"devDependencies": {
"mocha": "*",
Expand Down
24 changes: 5 additions & 19 deletions test/hooks.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,17 @@ describe('hooks', function() {
});
});

it('should correctly call the asynchronous overload', function() {
it('should not support an asynchronous overload', function() {
var hookCalled = false;

var model = createModel(function(done) {
this.created = new Date();
setTimeout(function() { hookCalled = true; done(); }, 1);
should.fail('asynchronous overload should not be supported');
});

return model.insert({ data: 'Testing' }).then(function(created) {
should.exist(created);
should.exist(created.created);
hookCalled.should.be.true;
should.fail('asynchronous overload should not be supported');
}, function(err) {
return Q();
});
});

Expand All @@ -68,19 +67,6 @@ describe('hooks', function() {
return Q();
});
});

it('should convey errors in the asynchronous overload', function() {
var model = createModel(function(done) {
done(Error('Should fail'));
});

return model.insert({ data: 'Testing' }).then(function(inserted) {
should.fail();
}, function(err) {
err.message.should.eql('Should fail');
return Q();
});
});
});

describe('retrieved', function() {
Expand Down

0 comments on commit 9cf9041

Please sign in to comment.