From 6a271ace73b38012aaa846121c3e18ea94b0d6e9 Mon Sep 17 00:00:00 2001 From: Tom Kirkpatrick Date: Mon, 22 Aug 2016 10:59:23 +0200 Subject: [PATCH] Add contact-importer helper --- .../contact-importer/contact-importer.js | 133 ++++++++++++++++++ package.json | 10 +- .../contact-importer/contact-importer.test.js | 64 +++++++++ 3 files changed, 206 insertions(+), 1 deletion(-) create mode 100644 lib/helpers/contact-importer/contact-importer.js create mode 100644 test/helpers/contact-importer/contact-importer.test.js diff --git a/lib/helpers/contact-importer/contact-importer.js b/lib/helpers/contact-importer/contact-importer.js new file mode 100644 index 000000000..a20b6fffc --- /dev/null +++ b/lib/helpers/contact-importer/contact-importer.js @@ -0,0 +1,133 @@ +/* eslint dot-notation: 'off' */ +'use strict'; + +var Bottleneck = require('bottleneck'); +var EventEmitter = require('events').EventEmitter; +var chunk = require('lodash.chunk'); +var debug = require('debug')('sendgrid'); +var util = require('util'); +var queue = require('async.queue'); +var ensureAsync = require('async.ensureasync'); + +var ContactImporter = module.exports = function(sg, options) { + options = options || {}; + var self = this; + this.sg = sg; + this.pendingItems = []; + + // Number of items to send per batch. + this.batchSize = options.batchSize || 1500; + + // Max number of requests per rate limit period. + this.rateLimitLimit = options.rateLimitLimit || 3; + + // Length of rate limit period (miliseconds). + this.rateLimitPeriod = options.rateLimitPeriod || 2000; + + // Create a throttler that will process no more than `rateLimitLimit` requests every `rateLimitPeriod` ms. + this.throttle = new Bottleneck(1, this.rateLimitPeriod / this.rateLimitLimit); + + // Create a queue that wil be used to send batches to the throttler. + this.queue = queue(ensureAsync(this._worker)); + + // When the last batch is removed from the queue, add any incomplete batches. + this.queue.empty = function() { + if (self.pendingItems.length) { + debug('adding %s items from deferrd queue for processing', self.pendingItems.length); + var batch = self.pendingItems.splice(0); + self.queue.push({ + data: batch, + owner: self, + }, function(error, result) { + self._notify(error, JSON.parse(result.body), batch); + }); + } + }; + + // Emit an event when the queue is drained. + this.queue.drain = function() { + self.emit('drain'); + }; +}; +util.inherits(ContactImporter, EventEmitter); + +/** + * Add a new contact, or an array of contact, to the end of the queue. + * + * @param {Array|Object} data A contact or array of contacts. + */ +ContactImporter.prototype.push = function(data) { + var self = this; + data = Array.isArray(data) ? data : [data]; + + // Add the new items onto the pending items. + var itemsToProcess = this.pendingItems.concat(data); + + // Chunk the pending items into batches and add onto the queue + var batches = chunk(itemsToProcess, this.batchSize); + debug('generated batches %s from %s items', batches.length, data.length); + + batches.forEach(function(batch) { + // If this batch is full or the queue is empty queue it for processing. + if (batch.length === self.batchSize || !self.queue.length) { + self.queue.push({ + data: batch, + owner: self, + }, function(error, result) { + self._notify(error, JSON.parse(result.body), batch); + }); + } + // Otherwise, it store it for later. + else { + debug('the last batch with only %s item is deferred (partial batch)', batch.length); + self.pendingItems = batch; + } + }); + + debug('batches in queue: %s', this.queue.length); + debug('items in deferred queue: %s', this.pendingItems.length); +}; + +/** + * Send a batch of contacts to Sendgrid. + * + * @param {Object} task Task to be processed (data in 'data' property) + * @param {Function} callback Callback function. + */ +ContactImporter.prototype._worker = function(task, callback) { + var context = task.owner; + debug('processing batch (%s items)', task.data.length); + context.throttle.submit(context._sendBatch, context, task.data, callback); +}; + +ContactImporter.prototype._sendBatch = function(context, data, callback) { + debug('sending batch (%s items)', data.length); + + var request = context.sg.emptyRequest(); + request.method = 'POST'; + request.path = '/v3/contactdb/recipients'; + request.body = data; + + context.sg.API(request) + .then(function(response) { + debug('got response: %o', response); + return callback(null, response); + }) + .catch(function(error) { + debug('got error, %o', error); + return callback(error); + }); +}; + +/** + * Emit the result of processing a batch. + * + * @param {Object} error + * @param {Object} result + */ +ContactImporter.prototype._notify = function(error, result, batch) { + if (error) { + return this.emit('error', result, batch); + } + return this.emit('success', result, batch); +}; diff --git a/package.json b/package.json index f51a88e65..b8a864e44 100644 --- a/package.json +++ b/package.json @@ -22,12 +22,20 @@ "node": ">= 0.4.7" }, "dependencies": { + "async.ensureasync": "^0.5.2", + "async.queue": "^0.5.2", + "bottleneck": "^1.12.0", + "lodash.chunk": "^4.2.0", "sendgrid-rest": "^2.2.1" }, "devDependencies": { "chai": "^3.5.0", + "debug": "^2.2.0", "eslint": "^3.1.0", - "mocha": "^2.4.5" + "mocha": "^2.4.5", + "mocha-sinon": "^1.1.5", + "sinon": "^1.17.5", + "sinon-chai": "^2.8.0" }, "scripts": { "lint": "eslint . --fix", diff --git a/test/helpers/contact-importer/contact-importer.test.js b/test/helpers/contact-importer/contact-importer.test.js new file mode 100644 index 000000000..38ce3b98d --- /dev/null +++ b/test/helpers/contact-importer/contact-importer.test.js @@ -0,0 +1,64 @@ +var ContactImporter = require('../../../lib/helpers/contact-importer/contact-importer.js') +var sendgrid = require('../../../') + +var chai = require('chai') +var sinon = require('sinon') + +chai.should() +var expect = chai.expect +chai.use(require('sinon-chai')) + +require('mocha-sinon') + +describe.only('test_contact_importer', function() { + beforeEach(function() { + // Create a new SendGrid instance. + var API_KEY = process.env.API_KEY + var sg = sendgrid(API_KEY) + + // Create a new importer with a batch size of 2. + this.contactImporter = new ContactImporter(sg, { + batchSize: 2, + }) + // this.spy = sinon.spy(ContactImporter.prototype, '_sendBatch') + this.sinon.spy(ContactImporter.prototype, '_sendBatch') + + // Generate some test data. + var data = [] + for (i = 0; i < 5; i++) { + var item = { + email: 'example' + i + '@example.com', + first_name: 'Test', + last_name: 'User' + } + // Lets make the first user produce an error. + if (i === 0) { + item.invalid_field= 'some value' + } + data.push(item) + } + this.contactImporter.push(data) + }) + + it('test_contact_importer sends items in batches', function(done) { + var self = this + this.timeout(30000) + this.contactImporter.on('success', function(result, batch) { + console.log('SUCCESS result', result) + console.log('SUCCESS batch', batch) + }) + this.contactImporter.on('error', function(error, batch) { + console.log('SUCCESS error', error) + console.log('SUCCESS batch', batch) + }) + this.contactImporter.on('drain', function() { + expect(self.contactImporter._sendBatch).to.have.callCount(3) + done() + }) + + + // assert.equal(sg.limiters['/v3/contactdb/recipients'].queue.length, 2) + // assert.equal(sg.limiters['/v3/contactdb/recipients'].pendingItems.length, 5) + // done() + }) +})