From 9cf9041eba78fd6089966c5589c1836d6d65c4a4 Mon Sep 17 00:00:00 2001 From: Benjamin Pannell Date: Thu, 9 Oct 2014 22:00:43 +0200 Subject: [PATCH] Massive insert performance optimizations Still need to do some more optimizations for onRetrieved to further reduce overhead, but it's back to acceptable levels. --- lib/Model.js | 161 +++++++++++++++++++------------------------------- package.json | 5 +- test/hooks.js | 24 ++------ 3 files changed, 69 insertions(+), 121 deletions(-) diff --git a/lib/Model.js b/lib/Model.js index b09ee26..cbfe3cc 100644 --- a/lib/Model.js +++ b/lib/Model.js @@ -1,5 +1,5 @@ var _ = require('lodash'), - async = require('async-q'), + async = require('async'), debug = require('debug')('iridium:Model'), ObjectID = require('mongodb').ObjectID, EventEmitter = require('events').EventEmitter, @@ -228,8 +228,6 @@ Model.prototype.onRetrieved = function(conditions, results, wrapper, options) { var $ = this; - var deferred = Q.defer(); - wrapper = wrapper || this.wrap.bind(this); options = options || {}; @@ -242,106 +240,83 @@ Model.prototype.onRetrieved = function(conditions, results, wrapper, options) { var returnArray = Array.isArray(results); if(!returnArray) results = [results]; - function doHook(hook, target) { - if(!hook) return Q(); + function doHook(hook, target, next) { + if(!hook) return next(); if(hook.length === 0) { try { hook.call(target); - return Q(); + return next(); } catch(err) { - return Q.reject(err); + return next(err); } } else { - var q = Q.defer(); - hook.call(target, function(err) { - if(err) return q.reject(err); - return q.resolve(); - }); - return q.promise; + hook.call(target, next); } } + var deferred = Q.defer(); + async.parallel(_.map(results, function(target) { - return (function() { - return doHook(this.options.hooks.retrieved, target).then((function() { + return (function(done) { + doHook(this.options.hooks.retrieved, target, (function(err) { + if(err) return done(err); + this.emit('retrieved', target); - return Q(); - }).bind(this)).then((function() { - if(!options.cache || options.partial) return Q(); - - var q = Q.defer(); - var cacheDoc = _.cloneDeep(target); - - this.cache.store(conditions, cacheDoc, function() { - q.resolve(); - }); - - return q.promise; - }).bind(this)).then((function() { + + if(this.cache && options.cache && !options.partial) { + var cacheDoc = _.cloneDeep(target); + this.cache.store(conditions, cacheDoc, function() { }); + } + var wrapped = target; if(options.wrap) wrapped = wrapper(target, false, options.partial); else this.fromSource(wrapped); - return Q(wrapped); - }).bind(this)).then((function(wrapped) { - return doHook(this.options.hooks.ready, wrapped).then(function() { return Q(wrapped); }); + doHook(this.options.hooks.ready, wrapped, (function(err) { + if(err) return done(err); + + return done(null, wrapped); + }).bind(this)); }).bind(this)); }).bind(this); - }, this)).then(function(output) { - if(returnArray) - return deferred.resolve(output); - else - return deferred.resolve(output[0]); - }, (function(err) { - this.emit('error', err); - return deferred.reject(err); - }).bind(this)); + }, this), function(err, results) { + if(err) return deferred.reject(err); + if(returnArray) return deferred.resolve(results); + return deferred.resolve(results[0]); + }); - return deferred.promise; + return deferred.promise; }; -Model.prototype.onCreating = function(document) { +Model.prototype.onCreating = function(documents) { /// ///Handles any pre-creation hooks for the model - ///The document being created - prior to any transformations being applied + ///The documents being created - prior to any transformations being applied /// - function doHook(hook, target, args) { - if(!hook) return Q(); - if(hook.length === 0) { - try { - hook.apply(target, args); - return Q(); - } catch(err) { - return Q.reject(err); - } - } else { - var q = Q.defer(); - args.push(function(err) { - if(err) return q.reject(err); - return q.resolve(); - }); - - hook.apply(target, args); - return q.promise; - } + function doHook(hook, target) { + if(!hook) return; + if(hook.length > 0) throw new Error('creating hook does not support async for performance reasons'); + hook.call(target); } - var sent = false; - return doHook(this.options.hooks.creating || this.options.hooks.beforeCreate, document, []).then((function() { - if(!sent) { - sent = true; + for(var i = 0; i < documents.length; i++) { + var document = documents[i]; + try { this.emit('creating', document); + doHook(this.options.hooks.creating || this.options.hooks.beforeCreate, document); + var validation = this.schemaValidator.validate(document); + if(validation.failed) return Q.reject(validation.error); + this.toSource(document); + } catch(err) { + return Q.reject(err); } - return Q(document); - }).bind(this), (function(err) { - this.emit('error', err); - return Q.reject(err); - }).bind(this)); - + } + + return Q(documents); }; @@ -521,7 +496,7 @@ Model.prototype.findOne = Model.prototype.get = fn.first(function() { var deferred = Q.defer(); if(this.options.cache && this.context.cache && this.context.cache.valid(this.conditions)) - return Q.nbind(this.context.cache.fetch, this.context.cache)(this.conditions).then(function(doc) { if(doc) return Q(doc); return Q.reject(null); }); + return Q.ninvoke(this.context.cache, "fetch", this.conditions).then(function(doc) { if(doc) return Q(doc); return Q.reject(null); }); else return Q.reject(); return deferred.promise; @@ -530,9 +505,7 @@ Model.prototype.findOne = Model.prototype.get = fn.first(function() { .then((function(doc) { return this.context.onRetrieved(this.conditions, doc, null, { wrap: this.options.wrap, cache: this.options.cache, partial: !!this.options.fields }); }).bind(this), (function(err) { - var findOne = Q.nbind(this.context.collection.findOne, this.context.collection); - - return findOne(this.conditions, { sort: this.options.sort, skip: this.options.skip, fields: this.options.fields }).then((function(doc) { + return Q.ninvoke(this.context.collection, "findOne", this.conditions, { sort: this.options.sort, skip: this.options.skip, fields: this.options.fields }).then((function(doc) { if(!doc) return Q(doc); return this.context.onRetrieved(this.conditions, doc, null, { wrap: this.options.wrap, cache: this.options.cache, partial: !!this.options.fields }); }).bind(this)); @@ -546,8 +519,8 @@ Model.prototype.insert = Model.prototype.create = fn.first(function() { this.deferred = Q.defer(); }) .on(Object, fn.gobble(), function(object) { - this.args[0] = [object]; this.returnArray = false; + this.args[0] = [object]; this.retry(); }) .on([Object], fn.opt(Function), function(objects, callback) { @@ -573,28 +546,18 @@ Model.prototype.insert = Model.prototype.create = fn.first(function() { if(callback) promiseCallback(this.deferred.promise, callback); }).then(function() { var queryOptions = { w: this.options.w, upsert: !!this.options.upsert, new: true }; - - promisePipe(async.series(_.map(this.objects, function(object) { - return (function() { - return this.context.onCreating(object).then((function(object) { - var validation = this.context.schemaValidator.validate(object); - if(validation.failed) return Q.reject(validation.error); - return Q(object); - }).bind(this)).then((function(object) { - this.context.toSource(object); - return Q(object); - }).bind(this)); - }).bind(this); - }, this)).then((function(objects) { - if(queryOptions.upsert) return async.parallel(_.map(objects, function(object) { - return (function() { - return Q.nbind(this.context.collection.findAndModify, this.context.collection)({ _id: object._id }, { _id: 1 }, object, queryOptions).then(function(result) { + + promisePipe(Q().then((function() { + return this.context.onCreating(this.objects); + }).bind(this)).then((function(objects) { + if(queryOptions.upsert) return Q.ninvoke(async, "parallel", _.map(objects, function(object) { + return (function(done) { + Q.ninvoke(this.context.collection, "findAndModify", { _id: object._id }, { _id: 1 }, object, queryOptions).then(function(result) { return Q(result[0]); - }); + }).nodeify(done); }).bind(this); - }, this)) - ; - else return Q.nbind(this.context.collection.insert, this.context.collection)(objects, queryOptions); + }, this)); + else return Q.ninvoke(this.context.collection, "insert", objects, queryOptions); }).bind(this)).then((function(inserted) { return this.context.onRetrieved(null, inserted, null, { wrap: this.options.wrap, cache: this.options.cache }); }).bind(this)).then((function(results) { @@ -650,7 +613,7 @@ Model.prototype.count = fn.first(function() { this.deferred = Q.defer(); this.count = Q.nbind(this.context.collection.count, this.context.collection); }).on(fn.not(Object), fn.gobble(), function(conditions, callback) { - this.args[0] = this.context.downstreamID(conditions); + this.args[0] = this.context.downstreamID(id); this.retry(); }).on(fn.opt(Function), function(callback) { this.conditions = {}; @@ -675,7 +638,7 @@ Model.prototype.remove = fn.first(function() { this.cacheDrop = Q.nbind(this.context.cache.drop, this.context.cache); this.remove = Q.nbind(this.context.collection.remove, this.context.collection); }).on(fn.not(Object), fn.gobble(), function(conditions, callback) { - this.args[0] = this.context.downstreamID(conditions); + this.args[0] = this.context.downstreamID(id); this.retry(); }).on(fn.opt(Function), function(callback) { this.conditions = {}; diff --git a/package.json b/package.json index 46f7991..0719563 100644 --- a/package.json +++ b/package.json @@ -23,12 +23,11 @@ "mongodb": "~1.4", "skmatc": "1.x", "lodash": "*", - "async": "~0.9", "concoction": "~1.0", "debug": "1.x", "q": "1.x", - "async-q": "~0.2", - "functionality": "1.x" + "async": "~0.9", + "functionality": "2.x" }, "devDependencies": { "mocha": "*", diff --git a/test/hooks.js b/test/hooks.js index dfc6fea..431727f 100644 --- a/test/hooks.js +++ b/test/hooks.js @@ -41,18 +41,17 @@ describe('hooks', function() { }); }); - it('should correctly call the asynchronous overload', function() { + it('should not support an asynchronous overload', function() { var hookCalled = false; var model = createModel(function(done) { - this.created = new Date(); - setTimeout(function() { hookCalled = true; done(); }, 1); + should.fail('asynchronous overload should not be supported'); }); return model.insert({ data: 'Testing' }).then(function(created) { - should.exist(created); - should.exist(created.created); - hookCalled.should.be.true; + should.fail('asynchronous overload should not be supported'); + }, function(err) { + return Q(); }); }); @@ -68,19 +67,6 @@ describe('hooks', function() { return Q(); }); }); - - it('should convey errors in the asynchronous overload', function() { - var model = createModel(function(done) { - done(Error('Should fail')); - }); - - return model.insert({ data: 'Testing' }).then(function(inserted) { - should.fail(); - }, function(err) { - err.message.should.eql('Should fail'); - return Q(); - }); - }); }); describe('retrieved', function() {