diff options
author | Tom Kirkpatrick <tom@systemseed.com> | 2016-08-22 17:50:21 +0200 |
---|---|---|
committer | Tom Kirkpatrick <tom@systemseed.com> | 2016-08-22 17:50:21 +0200 |
commit | 80cee1995cf2ac5793f1b6253f1e674e67004ea0 (patch) | |
tree | 914c35854ddcdf835c476e149ea1c1478a59eecf | |
parent | 1dc832b7647d039c07e13ea750ceb2cf851dea52 (diff) | |
download | sendgrid-nodejs-80cee1995cf2ac5793f1b6253f1e674e67004ea0.zip sendgrid-nodejs-80cee1995cf2ac5793f1b6253f1e674e67004ea0.tar.gz sendgrid-nodejs-80cee1995cf2ac5793f1b6253f1e674e67004ea0.tar.bz2 |
Optimise the throttle to send items as soon as possible
-rw-r--r-- | lib/helpers/contact-importer/contact-importer.js | 21 | ||||
-rw-r--r-- | test/helpers/contact-importer/contact-importer.test.js | 2 |
2 files changed, 18 insertions, 5 deletions
diff --git a/lib/helpers/contact-importer/contact-importer.js b/lib/helpers/contact-importer/contact-importer.js index 017fa69..eb5778b 100644 --- a/lib/helpers/contact-importer/contact-importer.js +++ b/lib/helpers/contact-importer/contact-importer.js @@ -25,7 +25,8 @@ var ContactImporter = module.exports = function(sg, options) { this.rateLimitPeriod = options.rateLimitPeriod || 2000; // Create a throttler that will process no more than `rateLimitLimit` requests every `rateLimitPeriod` ms. - this.throttle = new Bottleneck(1, this.rateLimitPeriod / this.rateLimitLimit); + this.throttle = new Bottleneck(0, 0); + this.throttle.changeReservoir(this.rateLimitLimit); // Create a queue that wil be used to send batches to the throttler. this.queue = queue(ensureAsync(this._worker)); @@ -39,7 +40,10 @@ var ContactImporter = module.exports = function(sg, options) { data: batch, owner: self, }, function(error, result) { - self._notify(error, JSON.parse(result.body), batch); + if (error) { + return self._notify(error, JSON.parse(error.response.body), batch); + } + return self._notify(null, JSON.parse(result.body), batch); }); } }; @@ -74,7 +78,10 @@ ContactImporter.prototype.push = function(data) { data: batch, owner: self, }, function(error, result) { - self._notify(error, JSON.parse(result.body), batch); + if (error) { + return self._notify(error, JSON.parse(error.response.body), batch); + } + return self._notify(null, JSON.parse(result.body), batch); }); } // Otherwise, it store it for later. @@ -111,10 +118,16 @@ ContactImporter.prototype._sendBatch = function(context, data, callback) { context.sg.API(request) .then(function(response) { debug('got response: %o', response); + setTimeout(function() { + context.throttle.incrementReservoir(1); + }, context.rateLimitPeriod); return callback(null, response); }) .catch(function(error) { debug('got error, %o', error); + setTimeout(function() { + context.throttle.incrementReservoir(1); + }, context.rateLimitPeriod); return callback(error); }); }; @@ -127,7 +140,7 @@ ContactImporter.prototype._sendBatch = function(context, data, callback) { */ ContactImporter.prototype._notify = function(error, result, batch) { if (error) { - return this.emit('error', result, batch); + return this.emit('error', error, batch); } return this.emit('success', result, batch); }; diff --git a/test/helpers/contact-importer/contact-importer.test.js b/test/helpers/contact-importer/contact-importer.test.js index 38ce3b9..e0e00a3 100644 --- a/test/helpers/contact-importer/contact-importer.test.js +++ b/test/helpers/contact-importer/contact-importer.test.js @@ -32,7 +32,7 @@ describe.only('test_contact_importer', function() { last_name: 'User' } // Lets make the first user produce an error. - if (i === 0) { + if (i === 1) { item.invalid_field= 'some value' } data.push(item) |