Skip to content

Commit

Permalink
fixes sendgrid#532
Browse files Browse the repository at this point in the history
  • Loading branch information
itaditya committed Oct 24, 2017
1 parent 094260f commit d0b0a32
Showing 1 changed file with 29 additions and 36 deletions.
65 changes: 29 additions & 36 deletions packages/contact-importer/src/importer.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@
'use strict';

const Bottleneck = require('bottleneck');
const EventEmitter = require('events').EventEmitter;
const { EventEmitter } = require('events');
const chunk = require('lodash.chunk');
const debug = require('debug')('sendgrid');
const util = require('util');
const queue = require('async.queue');
const ensureAsync = require('async.ensureasync');

const ContactImporter = module.exports = function(sg, options) {
options = options || {};
const ContactImporter = module.exports = function(sg, options = {}) {
const self = this;
this.sg = sg;
this.pendingItems = [];
Expand All @@ -32,26 +31,24 @@ const ContactImporter = module.exports = function(sg, options) {
this.queue = queue(ensureAsync(this._worker));

// When the last batch is removed from the queue, add any incomplete batches.
this.queue.empty = function() {
if (self.pendingItems.length) {
debug('adding %s items from deferrd queue for processing', self.pendingItems.length);
const batch = self.pendingItems.splice(0);
self.queue.push({
this.queue.empty = () => {
if (this.pendingItems.length) {
debug('adding %s items from deferrd queue for processing', this.pendingItems.length);
const batch = this.pendingItems.splice(0);
this.queue.push({
data: batch,
owner: self,
}, function(error, result) {
owner: this,
}, (error, { body }) => {
if (error) {
return self._notify(error, JSON.parse(error.response.body), batch);
return this._notify(error, JSON.parse(error.response.body), batch);
}
return self._notify(null, JSON.parse(result.body), batch);
return this._notify(null, JSON.parse(body), batch);
});
}
};

// Emit an event when the queue is drained.
this.queue.drain = function() {
self.emit('drain');
};
this.queue.drain = () => this.emit('drain')
};
util.inherits(ContactImporter, EventEmitter);

Expand All @@ -71,23 +68,23 @@ ContactImporter.prototype.push = function(data) {
const batches = chunk(itemsToProcess, this.batchSize);
debug('generated batches %s from %s items', batches.length, data.length);

batches.forEach(function(batch) {
batches.forEach(batch => {
// If this batch is full or the queue is empty queue it for processing.
if (batch.length === self.batchSize || !self.queue.length()) {
self.queue.push({
if (batch.length === this.batchSize || !this.queue.length()) {
this.queue.push({
data: batch,
owner: self,
}, function(error, result) {
owner: this,
}, function(error, { body }) {
if (error) {
return self._notify(error, JSON.parse(error.response.body), batch);
return this._notify(error, JSON.parse(error.response.body), batch);
}
return self._notify(null, JSON.parse(result.body), batch);
return this._notify(null, JSON.parse(body), batch);
});
}
// Otherwise, it store it for later.
else {
debug('the last batch with only %s item is deferred (partial batch)', batch.length);
self.pendingItems = batch;
this.pendingItems = batch;
}
});

Expand All @@ -101,33 +98,29 @@ ContactImporter.prototype.push = function(data) {
* @param {Object} task Task to be processed (data in 'data' property)
* @param {Function} callback Callback function.
*/
ContactImporter.prototype._worker = function(task, callback) {
const context = task.owner;
debug('processing batch (%s items)', task.data.length);
context.throttle.submit(context._sendBatch, context, task.data, callback);
ContactImporter.prototype._worker = function({ owner, data }, callback) {
const context = owner;
debug(`processing batch (${data.length} items)`);
context.throttle.submit(context._sendBatch, context, data, callback);
};

ContactImporter.prototype._sendBatch = function(context, data, callback) {
debug('sending batch (%s items)', data.length);
debug(`sending batch (${data.length} items)`);

const request = context.sg.emptyRequest();
request.method = 'POST';
request.path = '/v3/contactdb/recipients';
request.body = data;

context.sg.API(request)
.then(function(response) {
.then(response => {
debug('got response: %o', response);
setTimeout(function() {
context.throttle.incrementReservoir(1);
}, context.rateLimitPeriod);
setTimeout(() => context.throttle.incrementReservoir(1), context.rateLimitPeriod);
return callback(null, response);
})
.catch(function(error) {
.catch(error => {
debug('got error, %o', error);
setTimeout(function() {
context.throttle.incrementReservoir(1);
}, context.rateLimitPeriod);
setTimeout(() => context.throttle.incrementReservoir(1), context.rateLimitPeriod);
return callback(error);
});
};
Expand Down

0 comments on commit d0b0a32

Please sign in to comment.