diff --git a/packages/contact-importer/src/importer.js b/packages/contact-importer/src/importer.js index a03d78c54..19d2ae2f3 100644 --- a/packages/contact-importer/src/importer.js +++ b/packages/contact-importer/src/importer.js @@ -2,15 +2,14 @@ 'use strict'; const Bottleneck = require('bottleneck'); -const EventEmitter = require('events').EventEmitter; +const { EventEmitter } = require('events'); const chunk = require('lodash.chunk'); const debug = require('debug')('sendgrid'); const util = require('util'); const queue = require('async.queue'); const ensureAsync = require('async.ensureasync'); -const ContactImporter = module.exports = function(sg, options) { - options = options || {}; +const ContactImporter = module.exports = function(sg, options = {}) { const self = this; this.sg = sg; this.pendingItems = []; @@ -32,26 +31,24 @@ const ContactImporter = module.exports = function(sg, options) { this.queue = queue(ensureAsync(this._worker)); // When the last batch is removed from the queue, add any incomplete batches. - this.queue.empty = function() { - if (self.pendingItems.length) { - debug('adding %s items from deferrd queue for processing', self.pendingItems.length); - const batch = self.pendingItems.splice(0); - self.queue.push({ + this.queue.empty = () => { + if (this.pendingItems.length) { + debug('adding %s items from deferrd queue for processing', this.pendingItems.length); + const batch = this.pendingItems.splice(0); + this.queue.push({ data: batch, - owner: self, - }, function(error, result) { + owner: this, + }, (error, { body }) => { if (error) { - return self._notify(error, JSON.parse(error.response.body), batch); + return this._notify(error, JSON.parse(error.response.body), batch); } - return self._notify(null, JSON.parse(result.body), batch); + return this._notify(null, JSON.parse(body), batch); }); } }; // Emit an event when the queue is drained. - this.queue.drain = function() { - self.emit('drain'); - }; + this.queue.drain = () => this.emit('drain') }; util.inherits(ContactImporter, EventEmitter); @@ -71,23 +68,23 @@ ContactImporter.prototype.push = function(data) { const batches = chunk(itemsToProcess, this.batchSize); debug('generated batches %s from %s items', batches.length, data.length); - batches.forEach(function(batch) { + batches.forEach(batch => { // If this batch is full or the queue is empty queue it for processing. - if (batch.length === self.batchSize || !self.queue.length()) { - self.queue.push({ + if (batch.length === this.batchSize || !this.queue.length()) { + this.queue.push({ data: batch, - owner: self, - }, function(error, result) { + owner: this, + }, function(error, { body }) { if (error) { - return self._notify(error, JSON.parse(error.response.body), batch); + return this._notify(error, JSON.parse(error.response.body), batch); } - return self._notify(null, JSON.parse(result.body), batch); + return this._notify(null, JSON.parse(body), batch); }); } // Otherwise, it store it for later. else { debug('the last batch with only %s item is deferred (partial batch)', batch.length); - self.pendingItems = batch; + this.pendingItems = batch; } }); @@ -101,14 +98,14 @@ ContactImporter.prototype.push = function(data) { * @param {Object} task Task to be processed (data in 'data' property) * @param {Function} callback Callback function. */ -ContactImporter.prototype._worker = function(task, callback) { - const context = task.owner; - debug('processing batch (%s items)', task.data.length); - context.throttle.submit(context._sendBatch, context, task.data, callback); +ContactImporter.prototype._worker = function({ owner, data }, callback) { + const context = owner; + debug(`processing batch (${data.length} items)`); + context.throttle.submit(context._sendBatch, context, data, callback); }; ContactImporter.prototype._sendBatch = function(context, data, callback) { - debug('sending batch (%s items)', data.length); + debug(`sending batch (${data.length} items)`); const request = context.sg.emptyRequest(); request.method = 'POST'; @@ -116,18 +113,14 @@ ContactImporter.prototype._sendBatch = function(context, data, callback) { request.body = data; context.sg.API(request) - .then(function(response) { + .then(response => { debug('got response: %o', response); - setTimeout(function() { - context.throttle.incrementReservoir(1); - }, context.rateLimitPeriod); + setTimeout(() => context.throttle.incrementReservoir(1), context.rateLimitPeriod); return callback(null, response); }) - .catch(function(error) { + .catch(error => { debug('got error, %o', error); - setTimeout(function() { - context.throttle.incrementReservoir(1); - }, context.rateLimitPeriod); + setTimeout(() => context.throttle.incrementReservoir(1), context.rateLimitPeriod); return callback(error); }); };