Skip to content

Commit

Permalink
Add contact-importer helper
Browse files Browse the repository at this point in the history
  • Loading branch information
Tom Kirkpatrick committed Aug 22, 2016
1 parent d448eca commit 1dc832b
Show file tree
Hide file tree
Showing 4 changed files with 207 additions and 1 deletion.
1 change: 1 addition & 0 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
exports = module.exports = require('./lib/sendgrid');
exports.mail = require('./lib/helpers/mail/mail.js');
exports.importer = require('./lib/helpers/contact-importer/contact-importer.js');
133 changes: 133 additions & 0 deletions lib/helpers/contact-importer/contact-importer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/* eslint dot-notation: 'off' */
'use strict';

var Bottleneck = require('bottleneck');
var EventEmitter = require('events').EventEmitter;
var chunk = require('lodash.chunk');
var debug = require('debug')('sendgrid');
var util = require('util');
var queue = require('async.queue');
var ensureAsync = require('async.ensureasync');

var ContactImporter = module.exports = function(sg, options) {
options = options || {};
var self = this;
this.sg = sg;
this.pendingItems = [];

// Number of items to send per batch.
this.batchSize = options.batchSize || 1500;

// Max number of requests per rate limit period.
this.rateLimitLimit = options.rateLimitLimit || 3;

// Length of rate limit period (miliseconds).
this.rateLimitPeriod = options.rateLimitPeriod || 2000;

// Create a throttler that will process no more than `rateLimitLimit` requests every `rateLimitPeriod` ms.
this.throttle = new Bottleneck(1, this.rateLimitPeriod / this.rateLimitLimit);

// Create a queue that wil be used to send batches to the throttler.
this.queue = queue(ensureAsync(this._worker));

// When the last batch is removed from the queue, add any incomplete batches.
this.queue.empty = function() {
if (self.pendingItems.length) {
debug('adding %s items from deferrd queue for processing', self.pendingItems.length);
var batch = self.pendingItems.splice(0);
self.queue.push({
data: batch,
owner: self,
}, function(error, result) {
self._notify(error, JSON.parse(result.body), batch);
});
}
};

// Emit an event when the queue is drained.
this.queue.drain = function() {
self.emit('drain');
};
};
util.inherits(ContactImporter, EventEmitter);

/**
* Add a new contact, or an array of contact, to the end of the queue.
*
* @param {Array|Object} data A contact or array of contacts.
*/
ContactImporter.prototype.push = function(data) {
var self = this;
data = Array.isArray(data) ? data : [data];

// Add the new items onto the pending items.
var itemsToProcess = this.pendingItems.concat(data);

// Chunk the pending items into batches and add onto the queue
var batches = chunk(itemsToProcess, this.batchSize);
debug('generated batches %s from %s items', batches.length, data.length);

batches.forEach(function(batch) {
// If this batch is full or the queue is empty queue it for processing.
if (batch.length === self.batchSize || !self.queue.length()) {
self.queue.push({
data: batch,
owner: self,
}, function(error, result) {
self._notify(error, JSON.parse(result.body), batch);
});
}
// Otherwise, it store it for later.
else {
debug('the last batch with only %s item is deferred (partial batch)', batch.length);
self.pendingItems = batch;
}
});

debug('batches in queue: %s', this.queue.length());
debug('items in deferred queue: %s', this.pendingItems.length);
};

/**
* Send a batch of contacts to Sendgrid.
*
* @param {Object} task Task to be processed (data in 'data' property)
* @param {Function} callback Callback function.
*/
ContactImporter.prototype._worker = function(task, callback) {
var context = task.owner;
debug('processing batch (%s items)', task.data.length);
context.throttle.submit(context._sendBatch, context, task.data, callback);
};

ContactImporter.prototype._sendBatch = function(context, data, callback) {
debug('sending batch (%s items)', data.length);

var request = context.sg.emptyRequest();
request.method = 'POST';
request.path = '/v3/contactdb/recipients';
request.body = data;

context.sg.API(request)
.then(function(response) {
debug('got response: %o', response);
return callback(null, response);
})
.catch(function(error) {
debug('got error, %o', error);
return callback(error);
});
};

/**
* Emit the result of processing a batch.
*
* @param {Object} error
* @param {Object} result
*/
ContactImporter.prototype._notify = function(error, result, batch) {
if (error) {
return this.emit('error', result, batch);
}
return this.emit('success', result, batch);
};
10 changes: 9 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,20 @@
"node": ">= 0.4.7"
},
"dependencies": {
"async.ensureasync": "^0.5.2",
"async.queue": "^0.5.2",
"bottleneck": "^1.12.0",
"lodash.chunk": "^4.2.0",
"sendgrid-rest": "^2.2.1"
},
"devDependencies": {
"chai": "^3.5.0",
"debug": "^2.2.0",
"eslint": "^3.1.0",
"mocha": "^2.4.5"
"mocha": "^2.4.5",
"mocha-sinon": "^1.1.5",
"sinon": "^1.17.5",
"sinon-chai": "^2.8.0"
},
"scripts": {
"lint": "eslint . --fix",
Expand Down
64 changes: 64 additions & 0 deletions test/helpers/contact-importer/contact-importer.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
var ContactImporter = require('../../../lib/helpers/contact-importer/contact-importer.js')
var sendgrid = require('../../../')

var chai = require('chai')
var sinon = require('sinon')

chai.should()
var expect = chai.expect
chai.use(require('sinon-chai'))

require('mocha-sinon')

describe.only('test_contact_importer', function() {
beforeEach(function() {
// Create a new SendGrid instance.
var API_KEY = process.env.API_KEY
var sg = sendgrid(API_KEY)

// Create a new importer with a batch size of 2.
this.contactImporter = new ContactImporter(sg, {
batchSize: 2,
})
// this.spy = sinon.spy(ContactImporter.prototype, '_sendBatch')
this.sinon.spy(ContactImporter.prototype, '_sendBatch')

// Generate some test data.
var data = []
for (i = 0; i < 5; i++) {
var item = {
email: 'example' + i + '@example.com',
first_name: 'Test',
last_name: 'User'
}
// Lets make the first user produce an error.
if (i === 0) {
item.invalid_field= 'some value'
}
data.push(item)
}
this.contactImporter.push(data)
})

it('test_contact_importer sends items in batches', function(done) {
var self = this
this.timeout(30000)
this.contactImporter.on('success', function(result, batch) {
console.log('SUCCESS result', result)
console.log('SUCCESS batch', batch)
})
this.contactImporter.on('error', function(error, batch) {
console.log('SUCCESS error', error)
console.log('SUCCESS batch', batch)
})
this.contactImporter.on('drain', function() {
expect(self.contactImporter._sendBatch).to.have.callCount(3)
done()
})


// assert.equal(sg.limiters['/v3/contactdb/recipients'].queue.length, 2)
// assert.equal(sg.limiters['/v3/contactdb/recipients'].pendingItems.length, 5)
// done()
})
})

0 comments on commit 1dc832b

Please sign in to comment.