Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tkp/contact importer #278

Merged
merged 4 commits into from
Sep 15, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
exports = module.exports = require('./lib/sendgrid');
exports.mail = require('./lib/helpers/mail/mail.js');
exports.importer = require('./lib/helpers/contact-importer/contact-importer.js');
146 changes: 146 additions & 0 deletions lib/helpers/contact-importer/contact-importer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/* eslint dot-notation: 'off' */
'use strict';

var Bottleneck = require('bottleneck');
var EventEmitter = require('events').EventEmitter;
var chunk = require('lodash.chunk');
var debug = require('debug')('sendgrid');
var util = require('util');
var queue = require('async.queue');
var ensureAsync = require('async.ensureasync');

var ContactImporter = module.exports = function(sg, options) {
options = options || {};
var self = this;
this.sg = sg;
this.pendingItems = [];

// Number of items to send per batch.
this.batchSize = options.batchSize || 1500;

// Max number of requests per rate limit period.
this.rateLimitLimit = options.rateLimitLimit || 3;

// Length of rate limit period (miliseconds).
this.rateLimitPeriod = options.rateLimitPeriod || 2000;

// Create a throttler that will process no more than `rateLimitLimit` requests every `rateLimitPeriod` ms.
this.throttle = new Bottleneck(0, 0);
this.throttle.changeReservoir(this.rateLimitLimit);

// Create a queue that wil be used to send batches to the throttler.
this.queue = queue(ensureAsync(this._worker));

// When the last batch is removed from the queue, add any incomplete batches.
this.queue.empty = function() {
if (self.pendingItems.length) {
debug('adding %s items from deferrd queue for processing', self.pendingItems.length);
var batch = self.pendingItems.splice(0);
self.queue.push({
data: batch,
owner: self,
}, function(error, result) {
if (error) {
return self._notify(error, JSON.parse(error.response.body), batch);
}
return self._notify(null, JSON.parse(result.body), batch);
});
}
};

// Emit an event when the queue is drained.
this.queue.drain = function() {
self.emit('drain');
};
};
util.inherits(ContactImporter, EventEmitter);

/**
* Add a new contact, or an array of contact, to the end of the queue.
*
* @param {Array|Object} data A contact or array of contacts.
*/
ContactImporter.prototype.push = function(data) {
var self = this;
data = Array.isArray(data) ? data : [data];

// Add the new items onto the pending items.
var itemsToProcess = this.pendingItems.concat(data);

// Chunk the pending items into batches and add onto the queue
var batches = chunk(itemsToProcess, this.batchSize);
debug('generated batches %s from %s items', batches.length, data.length);

batches.forEach(function(batch) {
// If this batch is full or the queue is empty queue it for processing.
if (batch.length === self.batchSize || !self.queue.length()) {
self.queue.push({
data: batch,
owner: self,
}, function(error, result) {
if (error) {
return self._notify(error, JSON.parse(error.response.body), batch);
}
return self._notify(null, JSON.parse(result.body), batch);
});
}
// Otherwise, it store it for later.
else {
debug('the last batch with only %s item is deferred (partial batch)', batch.length);
self.pendingItems = batch;
}
});

debug('batches in queue: %s', this.queue.length());
debug('items in deferred queue: %s', this.pendingItems.length);
};

/**
* Send a batch of contacts to Sendgrid.
*
* @param {Object} task Task to be processed (data in 'data' property)
* @param {Function} callback Callback function.
*/
ContactImporter.prototype._worker = function(task, callback) {
var context = task.owner;
debug('processing batch (%s items)', task.data.length);
context.throttle.submit(context._sendBatch, context, task.data, callback);
};

ContactImporter.prototype._sendBatch = function(context, data, callback) {
debug('sending batch (%s items)', data.length);

var request = context.sg.emptyRequest();
request.method = 'POST';
request.path = '/v3/contactdb/recipients';
request.body = data;

context.sg.API(request)
.then(function(response) {
debug('got response: %o', response);
setTimeout(function() {
context.throttle.incrementReservoir(1);
}, context.rateLimitPeriod);
return callback(null, response);
})
.catch(function(error) {
debug('got error, %o', error);
setTimeout(function() {
context.throttle.incrementReservoir(1);
}, context.rateLimitPeriod);
return callback(error);
});
};

/**
* Emit the result of processing a batch.
*
* @param {Object} error
* @param {Object} result
*/
ContactImporter.prototype._notify = function(error, result, batch) {
if (error) {
return this.emit('error', error, batch);
}
return this.emit('success', result, batch);
};
79 changes: 42 additions & 37 deletions lib/sendgrid.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,55 +64,60 @@ function makeHeaders(apiKey, globalHeaders) {
* SendGrid allows for quick and easy access to the v3 Web API
*/
function SendGrid(apiKey, host, globalHeaders) {
return new SendGridInstance(apiKey, host, globalHeaders);
}

/**
* SendGrid allows for quick and easy access to the v3 Web API
*/
function SendGridInstance(apiKey, host, globalHeaders) {
//Create global request
var globalRequest = getEmptyRequest({
this.globalRequest = getEmptyRequest({
host: host || 'api.sendgrid.com',
headers: makeHeaders(apiKey, globalHeaders),
});

//Initialize new client
var client = new Client(globalRequest);
this.client = new Client(this.globalRequest);
}

//Interact with the API with this function
SendGrid.API = function(request, callback) {
//Interact with the API with this function
SendGridInstance.prototype.API = function(request, callback) {
var self = this;

//If no callback provided, we will return a promise
if (!callback) {
if (!SendGrid.Promise) {
throw new SendGridError('Promise API not supported');
}
return new SendGrid.Promise(function(resolve, reject) {
client.API(request, function(response) {
if (isValidResponse(response)) {
resolve(response);
}
else {
var error = new SendGridError('Response error');
error.response = response;
reject(error);
}
});
});
//If no callback provided, we will return a promise
if (!callback) {
if (!SendGrid.Promise) {
throw new SendGridError('Promise API not supported');
}

//Use callback
client.API(request, function(response) {
if (isValidResponse(response)) {
callback(null, response);
}
else {
var error = new SendGridError('Response error');
callback(error, response);
}
return new SendGrid.Promise(function(resolve, reject) {
self.client.API(request, function(response) {
if (isValidResponse(response)) {
resolve(response);
}
else {
var error = new SendGridError('Response error');
error.response = response;
reject(error);
}
});
});
};
}

//Set requests
SendGrid.emptyRequest = getEmptyRequest;
SendGrid.globalRequest = globalRequest;
return SendGrid;
}
//Use callback
self.client.API(request, function(response) {
if (isValidResponse(response)) {
callback(null, response);
}
else {
var error = new SendGridError('Response error');
callback(error, response);
}
});
};

//Set requests
SendGridInstance.prototype.emptyRequest = getEmptyRequest;

//Try to use native promises by default
if (typeof Promise !== 'undefined') {
Expand Down
10 changes: 9 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,20 @@
"node": ">= 0.4.7"
},
"dependencies": {
"async.ensureasync": "^0.5.2",
"async.queue": "^0.5.2",
"bottleneck": "^1.12.0",
"lodash.chunk": "^4.2.0",
"sendgrid-rest": "^2.2.1"
},
"devDependencies": {
"chai": "^3.5.0",
"debug": "^2.2.0",
"eslint": "^3.1.0",
"mocha": "^2.4.5"
"mocha": "^2.4.5",
"mocha-sinon": "^1.1.5",
"sinon": "^1.17.5",
"sinon-chai": "^2.8.0"
},
"scripts": {
"lint": "eslint . --fix",
Expand Down
59 changes: 59 additions & 0 deletions test/helpers/contact-importer/contact-importer.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
var ContactImporter = require('../../../lib/helpers/contact-importer/contact-importer.js')
var sendgrid = require('../../../')

var chai = require('chai')
var sinon = require('sinon')

chai.should()
var expect = chai.expect
chai.use(require('sinon-chai'))

require('mocha-sinon')

describe.only('test_contact_importer', function() {
beforeEach(function() {
// Create a new SendGrid instance.
var API_KEY = process.env.API_KEY
var sg = sendgrid(API_KEY)

// Create a new importer with a batch size of 2.
this.contactImporter = new ContactImporter(sg, {
batchSize: 2,
})
// this.spy = sinon.spy(ContactImporter.prototype, '_sendBatch')
this.sinon.spy(ContactImporter.prototype, '_sendBatch')

// Generate some test data.
var data = []
for (i = 0; i < 5; i++) {
var item = {
email: 'example' + i + '@example.com',
first_name: 'Test',
last_name: 'User'
}
// Lets make the first user produce an error.
if (i === 1) {
item.invalid_field= 'some value'
}
data.push(item)
}
this.contactImporter.push(data)
})

it('test_contact_importer sends items in batches', function(done) {
var self = this
this.timeout(30000)
this.contactImporter.on('success', function(result, batch) {
console.log('SUCCESS result', result)
console.log('SUCCESS batch', batch)
})
this.contactImporter.on('error', function(error, batch) {
console.log('SUCCESS error', error)
console.log('SUCCESS batch', batch)
})
this.contactImporter.on('drain', function() {
expect(self.contactImporter._sendBatch).to.have.callCount(3)
done()
})
})
})