summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorElmer Thomas <elmer@ThinkingSerious.com>2016-09-14 17:03:02 -0700
committerGitHub <noreply@github.com>2016-09-14 17:03:02 -0700
commit25623f6830e5c0f00de1e3f3db24b6f90f1781c8 (patch)
tree7abe8f3bce20bdcc0edbc9863f2847509195b7f6
parent77e49732bc8912e8b2b66a182d38794ce672c03d (diff)
parent9b4b22c4995d13420f25a2d4250602f722bdd10d (diff)
downloadsendgrid-nodejs-25623f6830e5c0f00de1e3f3db24b6f90f1781c8.zip
sendgrid-nodejs-25623f6830e5c0f00de1e3f3db24b6f90f1781c8.tar.gz
sendgrid-nodejs-25623f6830e5c0f00de1e3f3db24b6f90f1781c8.tar.bz2
Merge pull request #278 from fullcube/tkp/contact-importer
Tkp/contact importer
-rw-r--r--index.js1
-rw-r--r--lib/helpers/contact-importer/contact-importer.js146
-rw-r--r--package.json10
-rw-r--r--test/helpers/contact-importer/contact-importer.test.js59
4 files changed, 215 insertions, 1 deletions
diff --git a/index.js b/index.js
index 4473136..1b49a0d 100644
--- a/index.js
+++ b/index.js
@@ -1,2 +1,3 @@
exports = module.exports = require('./lib/sendgrid');
exports.mail = require('./lib/helpers/mail/mail.js');
+exports.importer = require('./lib/helpers/contact-importer/contact-importer.js');
diff --git a/lib/helpers/contact-importer/contact-importer.js b/lib/helpers/contact-importer/contact-importer.js
new file mode 100644
index 0000000..eb5778b
--- /dev/null
+++ b/lib/helpers/contact-importer/contact-importer.js
@@ -0,0 +1,146 @@
+/* eslint dot-notation: 'off' */
+'use strict';
+
+var Bottleneck = require('bottleneck');
+var EventEmitter = require('events').EventEmitter;
+var chunk = require('lodash.chunk');
+var debug = require('debug')('sendgrid');
+var util = require('util');
+var queue = require('async.queue');
+var ensureAsync = require('async.ensureasync');
+
+var ContactImporter = module.exports = function(sg, options) {
+ options = options || {};
+ var self = this;
+ this.sg = sg;
+ this.pendingItems = [];
+
+ // Number of items to send per batch.
+ this.batchSize = options.batchSize || 1500;
+
+ // Max number of requests per rate limit period.
+ this.rateLimitLimit = options.rateLimitLimit || 3;
+
+ // Length of rate limit period (miliseconds).
+ this.rateLimitPeriod = options.rateLimitPeriod || 2000;
+
+ // Create a throttler that will process no more than `rateLimitLimit` requests every `rateLimitPeriod` ms.
+ this.throttle = new Bottleneck(0, 0);
+ this.throttle.changeReservoir(this.rateLimitLimit);
+
+ // Create a queue that wil be used to send batches to the throttler.
+ this.queue = queue(ensureAsync(this._worker));
+
+ // When the last batch is removed from the queue, add any incomplete batches.
+ this.queue.empty = function() {
+ if (self.pendingItems.length) {
+ debug('adding %s items from deferrd queue for processing', self.pendingItems.length);
+ var batch = self.pendingItems.splice(0);
+ self.queue.push({
+ data: batch,
+ owner: self,
+ }, function(error, result) {
+ if (error) {
+ return self._notify(error, JSON.parse(error.response.body), batch);
+ }
+ return self._notify(null, JSON.parse(result.body), batch);
+ });
+ }
+ };
+
+ // Emit an event when the queue is drained.
+ this.queue.drain = function() {
+ self.emit('drain');
+ };
+};
+util.inherits(ContactImporter, EventEmitter);
+
+/**
+ * Add a new contact, or an array of contact, to the end of the queue.
+ *
+ * @param {Array|Object} data A contact or array of contacts.
+ */
+ContactImporter.prototype.push = function(data) {
+ var self = this;
+ data = Array.isArray(data) ? data : [data];
+
+ // Add the new items onto the pending items.
+ var itemsToProcess = this.pendingItems.concat(data);
+
+ // Chunk the pending items into batches and add onto the queue
+ var batches = chunk(itemsToProcess, this.batchSize);
+ debug('generated batches %s from %s items', batches.length, data.length);
+
+ batches.forEach(function(batch) {
+ // If this batch is full or the queue is empty queue it for processing.
+ if (batch.length === self.batchSize || !self.queue.length()) {
+ self.queue.push({
+ data: batch,
+ owner: self,
+ }, function(error, result) {
+ if (error) {
+ return self._notify(error, JSON.parse(error.response.body), batch);
+ }
+ return self._notify(null, JSON.parse(result.body), batch);
+ });
+ }
+ // Otherwise, it store it for later.
+ else {
+ debug('the last batch with only %s item is deferred (partial batch)', batch.length);
+ self.pendingItems = batch;
+ }
+ });
+
+ debug('batches in queue: %s', this.queue.length());
+ debug('items in deferred queue: %s', this.pendingItems.length);
+};
+
+/**
+ * Send a batch of contacts to Sendgrid.
+ *
+ * @param {Object} task Task to be processed (data in 'data' property)
+ * @param {Function} callback Callback function.
+ */
+ContactImporter.prototype._worker = function(task, callback) {
+ var context = task.owner;
+ debug('processing batch (%s items)', task.data.length);
+ context.throttle.submit(context._sendBatch, context, task.data, callback);
+};
+
+ContactImporter.prototype._sendBatch = function(context, data, callback) {
+ debug('sending batch (%s items)', data.length);
+
+ var request = context.sg.emptyRequest();
+ request.method = 'POST';
+ request.path = '/v3/contactdb/recipients';
+ request.body = data;
+
+ context.sg.API(request)
+ .then(function(response) {
+ debug('got response: %o', response);
+ setTimeout(function() {
+ context.throttle.incrementReservoir(1);
+ }, context.rateLimitPeriod);
+ return callback(null, response);
+ })
+ .catch(function(error) {
+ debug('got error, %o', error);
+ setTimeout(function() {
+ context.throttle.incrementReservoir(1);
+ }, context.rateLimitPeriod);
+ return callback(error);
+ });
+};
+
+/**
+ * Emit the result of processing a batch.
+ *
+ * @param {Object} error
+ * @param {Object} result
+ */
+ContactImporter.prototype._notify = function(error, result, batch) {
+ if (error) {
+ return this.emit('error', error, batch);
+ }
+ return this.emit('success', result, batch);
+};
diff --git a/package.json b/package.json
index 30f5b66..2c41af1 100644
--- a/package.json
+++ b/package.json
@@ -22,12 +22,20 @@
"node": ">= 0.4.7"
},
"dependencies": {
+ "async.ensureasync": "^0.5.2",
+ "async.queue": "^0.5.2",
+ "bottleneck": "^1.12.0",
+ "lodash.chunk": "^4.2.0",
"sendgrid-rest": "^2.2.1"
},
"devDependencies": {
"chai": "^3.5.0",
+ "debug": "^2.2.0",
"eslint": "^3.1.0",
- "mocha": "^2.4.5"
+ "mocha": "^2.4.5",
+ "mocha-sinon": "^1.1.5",
+ "sinon": "^1.17.5",
+ "sinon-chai": "^2.8.0"
},
"scripts": {
"lint": "eslint . --fix",
diff --git a/test/helpers/contact-importer/contact-importer.test.js b/test/helpers/contact-importer/contact-importer.test.js
new file mode 100644
index 0000000..eecbb15
--- /dev/null
+++ b/test/helpers/contact-importer/contact-importer.test.js
@@ -0,0 +1,59 @@
+var ContactImporter = require('../../../lib/helpers/contact-importer/contact-importer.js')
+var sendgrid = require('../../../')
+
+var chai = require('chai')
+var sinon = require('sinon')
+
+chai.should()
+var expect = chai.expect
+chai.use(require('sinon-chai'))
+
+require('mocha-sinon')
+
+describe.only('test_contact_importer', function() {
+ beforeEach(function() {
+ // Create a new SendGrid instance.
+ var API_KEY = process.env.API_KEY
+ var sg = sendgrid(API_KEY)
+
+ // Create a new importer with a batch size of 2.
+ this.contactImporter = new ContactImporter(sg, {
+ batchSize: 2,
+ })
+ // this.spy = sinon.spy(ContactImporter.prototype, '_sendBatch')
+ this.sinon.spy(ContactImporter.prototype, '_sendBatch')
+
+ // Generate some test data.
+ var data = []
+ for (i = 0; i < 5; i++) {
+ var item = {
+ email: 'example' + i + '@example.com',
+ first_name: 'Test',
+ last_name: 'User'
+ }
+ // Lets make the first user produce an error.
+ if (i === 1) {
+ item.invalid_field= 'some value'
+ }
+ data.push(item)
+ }
+ this.contactImporter.push(data)
+ })
+
+ it('test_contact_importer sends items in batches', function(done) {
+ var self = this
+ this.timeout(30000)
+ this.contactImporter.on('success', function(result, batch) {
+ console.log('SUCCESS result', result)
+ console.log('SUCCESS batch', batch)
+ })
+ this.contactImporter.on('error', function(error, batch) {
+ console.log('SUCCESS error', error)
+ console.log('SUCCESS batch', batch)
+ })
+ this.contactImporter.on('drain', function() {
+ expect(self.contactImporter._sendBatch).to.have.callCount(3)
+ done()
+ })
+ })
+})