-
Notifications
You must be signed in to change notification settings - Fork 780
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Tom Kirkpatrick
committed
Aug 22, 2016
1 parent
d448eca
commit 6a271ac
Showing
3 changed files
with
206 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,133 @@ | ||
/* eslint dot-notation: 'off' */ | ||
'use strict'; | ||
|
||
var Bottleneck = require('bottleneck'); | ||
var EventEmitter = require('events').EventEmitter; | ||
var chunk = require('lodash.chunk'); | ||
var debug = require('debug')('sendgrid'); | ||
var util = require('util'); | ||
var queue = require('async.queue'); | ||
var ensureAsync = require('async.ensureasync'); | ||
|
||
var ContactImporter = module.exports = function(sg, options) { | ||
options = options || {}; | ||
var self = this; | ||
this.sg = sg; | ||
this.pendingItems = []; | ||
|
||
// Number of items to send per batch. | ||
this.batchSize = options.batchSize || 1500; | ||
|
||
// Max number of requests per rate limit period. | ||
this.rateLimitLimit = options.rateLimitLimit || 3; | ||
|
||
// Length of rate limit period (miliseconds). | ||
this.rateLimitPeriod = options.rateLimitPeriod || 2000; | ||
|
||
// Create a throttler that will process no more than `rateLimitLimit` requests every `rateLimitPeriod` ms. | ||
this.throttle = new Bottleneck(1, this.rateLimitPeriod / this.rateLimitLimit); | ||
|
||
// Create a queue that wil be used to send batches to the throttler. | ||
this.queue = queue(ensureAsync(this._worker)); | ||
|
||
// When the last batch is removed from the queue, add any incomplete batches. | ||
this.queue.empty = function() { | ||
if (self.pendingItems.length) { | ||
debug('adding %s items from deferrd queue for processing', self.pendingItems.length); | ||
var batch = self.pendingItems.splice(0); | ||
self.queue.push({ | ||
data: batch, | ||
owner: self, | ||
}, function(error, result) { | ||
self._notify(error, JSON.parse(result.body), batch); | ||
}); | ||
} | ||
}; | ||
|
||
// Emit an event when the queue is drained. | ||
this.queue.drain = function() { | ||
self.emit('drain'); | ||
}; | ||
}; | ||
util.inherits(ContactImporter, EventEmitter); | ||
|
||
/** | ||
* Add a new contact, or an array of contact, to the end of the queue. | ||
* | ||
* @param {Array|Object} data A contact or array of contacts. | ||
*/ | ||
ContactImporter.prototype.push = function(data) { | ||
var self = this; | ||
data = Array.isArray(data) ? data : [data]; | ||
|
||
// Add the new items onto the pending items. | ||
var itemsToProcess = this.pendingItems.concat(data); | ||
|
||
// Chunk the pending items into batches and add onto the queue | ||
var batches = chunk(itemsToProcess, this.batchSize); | ||
debug('generated batches %s from %s items', batches.length, data.length); | ||
|
||
batches.forEach(function(batch) { | ||
// If this batch is full or the queue is empty queue it for processing. | ||
if (batch.length === self.batchSize || !self.queue.length) { | ||
self.queue.push({ | ||
data: batch, | ||
owner: self, | ||
}, function(error, result) { | ||
self._notify(error, JSON.parse(result.body), batch); | ||
}); | ||
} | ||
// Otherwise, it store it for later. | ||
else { | ||
debug('the last batch with only %s item is deferred (partial batch)', batch.length); | ||
self.pendingItems = batch; | ||
} | ||
}); | ||
|
||
debug('batches in queue: %s', this.queue.length); | ||
debug('items in deferred queue: %s', this.pendingItems.length); | ||
}; | ||
|
||
/** | ||
* Send a batch of contacts to Sendgrid. | ||
* | ||
* @param {Object} task Task to be processed (data in 'data' property) | ||
* @param {Function} callback Callback function. | ||
*/ | ||
ContactImporter.prototype._worker = function(task, callback) { | ||
var context = task.owner; | ||
debug('processing batch (%s items)', task.data.length); | ||
context.throttle.submit(context._sendBatch, context, task.data, callback); | ||
}; | ||
|
||
ContactImporter.prototype._sendBatch = function(context, data, callback) { | ||
debug('sending batch (%s items)', data.length); | ||
|
||
var request = context.sg.emptyRequest(); | ||
request.method = 'POST'; | ||
request.path = '/v3/contactdb/recipients'; | ||
request.body = data; | ||
|
||
context.sg.API(request) | ||
.then(function(response) { | ||
debug('got response: %o', response); | ||
return callback(null, response); | ||
}) | ||
.catch(function(error) { | ||
debug('got error, %o', error); | ||
return callback(error); | ||
}); | ||
}; | ||
|
||
/** | ||
* Emit the result of processing a batch. | ||
* | ||
* @param {Object} error | ||
* @param {Object} result | ||
*/ | ||
ContactImporter.prototype._notify = function(error, result, batch) { | ||
if (error) { | ||
return this.emit('error', result, batch); | ||
} | ||
return this.emit('success', result, batch); | ||
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
var ContactImporter = require('../../../lib/helpers/contact-importer/contact-importer.js') | ||
var sendgrid = require('../../../') | ||
|
||
var chai = require('chai') | ||
var sinon = require('sinon') | ||
|
||
chai.should() | ||
var expect = chai.expect | ||
chai.use(require('sinon-chai')) | ||
|
||
require('mocha-sinon') | ||
|
||
describe.only('test_contact_importer', function() { | ||
beforeEach(function() { | ||
// Create a new SendGrid instance. | ||
var API_KEY = process.env.API_KEY | ||
var sg = sendgrid(API_KEY) | ||
|
||
// Create a new importer with a batch size of 2. | ||
this.contactImporter = new ContactImporter(sg, { | ||
batchSize: 2, | ||
}) | ||
// this.spy = sinon.spy(ContactImporter.prototype, '_sendBatch') | ||
this.sinon.spy(ContactImporter.prototype, '_sendBatch') | ||
|
||
// Generate some test data. | ||
var data = [] | ||
for (i = 0; i < 5; i++) { | ||
var item = { | ||
email: 'example' + i + '@example.com', | ||
first_name: 'Test', | ||
last_name: 'User' | ||
} | ||
// Lets make the first user produce an error. | ||
if (i === 0) { | ||
item.invalid_field= 'some value' | ||
} | ||
data.push(item) | ||
} | ||
this.contactImporter.push(data) | ||
}) | ||
|
||
it('test_contact_importer sends items in batches', function(done) { | ||
var self = this | ||
this.timeout(30000) | ||
this.contactImporter.on('success', function(result, batch) { | ||
console.log('SUCCESS result', result) | ||
console.log('SUCCESS batch', batch) | ||
}) | ||
this.contactImporter.on('error', function(error, batch) { | ||
console.log('SUCCESS error', error) | ||
console.log('SUCCESS batch', batch) | ||
}) | ||
this.contactImporter.on('drain', function() { | ||
expect(self.contactImporter._sendBatch).to.have.callCount(3) | ||
done() | ||
}) | ||
|
||
|
||
// assert.equal(sg.limiters['/v3/contactdb/recipients'].queue.length, 2) | ||
// assert.equal(sg.limiters['/v3/contactdb/recipients'].pendingItems.length, 5) | ||
// done() | ||
}) | ||
}) |