summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTom Kirkpatrick <tom@systemseed.com>2016-08-22 17:50:21 +0200
committerTom Kirkpatrick <tom@systemseed.com>2016-08-22 17:50:21 +0200
commit80cee1995cf2ac5793f1b6253f1e674e67004ea0 (patch)
tree914c35854ddcdf835c476e149ea1c1478a59eecf
parent1dc832b7647d039c07e13ea750ceb2cf851dea52 (diff)
downloadsendgrid-nodejs-80cee1995cf2ac5793f1b6253f1e674e67004ea0.zip
sendgrid-nodejs-80cee1995cf2ac5793f1b6253f1e674e67004ea0.tar.gz
sendgrid-nodejs-80cee1995cf2ac5793f1b6253f1e674e67004ea0.tar.bz2
Optimise the throttle to send items as soon as possible
-rw-r--r--lib/helpers/contact-importer/contact-importer.js21
-rw-r--r--test/helpers/contact-importer/contact-importer.test.js2
2 files changed, 18 insertions, 5 deletions
diff --git a/lib/helpers/contact-importer/contact-importer.js b/lib/helpers/contact-importer/contact-importer.js
index 017fa69..eb5778b 100644
--- a/lib/helpers/contact-importer/contact-importer.js
+++ b/lib/helpers/contact-importer/contact-importer.js
@@ -25,7 +25,8 @@ var ContactImporter = module.exports = function(sg, options) {
this.rateLimitPeriod = options.rateLimitPeriod || 2000;
// Create a throttler that will process no more than `rateLimitLimit` requests every `rateLimitPeriod` ms.
- this.throttle = new Bottleneck(1, this.rateLimitPeriod / this.rateLimitLimit);
+ this.throttle = new Bottleneck(0, 0);
+ this.throttle.changeReservoir(this.rateLimitLimit);
// Create a queue that wil be used to send batches to the throttler.
this.queue = queue(ensureAsync(this._worker));
@@ -39,7 +40,10 @@ var ContactImporter = module.exports = function(sg, options) {
data: batch,
owner: self,
}, function(error, result) {
- self._notify(error, JSON.parse(result.body), batch);
+ if (error) {
+ return self._notify(error, JSON.parse(error.response.body), batch);
+ }
+ return self._notify(null, JSON.parse(result.body), batch);
});
}
};
@@ -74,7 +78,10 @@ ContactImporter.prototype.push = function(data) {
data: batch,
owner: self,
}, function(error, result) {
- self._notify(error, JSON.parse(result.body), batch);
+ if (error) {
+ return self._notify(error, JSON.parse(error.response.body), batch);
+ }
+ return self._notify(null, JSON.parse(result.body), batch);
});
}
// Otherwise, it store it for later.
@@ -111,10 +118,16 @@ ContactImporter.prototype._sendBatch = function(context, data, callback) {
context.sg.API(request)
.then(function(response) {
debug('got response: %o', response);
+ setTimeout(function() {
+ context.throttle.incrementReservoir(1);
+ }, context.rateLimitPeriod);
return callback(null, response);
})
.catch(function(error) {
debug('got error, %o', error);
+ setTimeout(function() {
+ context.throttle.incrementReservoir(1);
+ }, context.rateLimitPeriod);
return callback(error);
});
};
@@ -127,7 +140,7 @@ ContactImporter.prototype._sendBatch = function(context, data, callback) {
*/
ContactImporter.prototype._notify = function(error, result, batch) {
if (error) {
- return this.emit('error', result, batch);
+ return this.emit('error', error, batch);
}
return this.emit('success', result, batch);
};
diff --git a/test/helpers/contact-importer/contact-importer.test.js b/test/helpers/contact-importer/contact-importer.test.js
index 38ce3b9..e0e00a3 100644
--- a/test/helpers/contact-importer/contact-importer.test.js
+++ b/test/helpers/contact-importer/contact-importer.test.js
@@ -32,7 +32,7 @@ describe.only('test_contact_importer', function() {
last_name: 'User'
}
// Lets make the first user produce an error.
- if (i === 0) {
+ if (i === 1) {
item.invalid_field= 'some value'
}
data.push(item)