diff options
author | Tom Kirkpatrick <tom@systemseed.com> | 2016-08-22 10:59:23 +0200 |
---|---|---|
committer | Tom Kirkpatrick <tom@systemseed.com> | 2016-08-22 14:17:26 +0200 |
commit | 1dc832b7647d039c07e13ea750ceb2cf851dea52 (patch) | |
tree | 1877be1151767cafc9c3d548c7f1d265fc6ac006 /lib | |
parent | d448ecae0ba4ce60733d4729ada035fddc794410 (diff) | |
download | sendgrid-nodejs-1dc832b7647d039c07e13ea750ceb2cf851dea52.zip sendgrid-nodejs-1dc832b7647d039c07e13ea750ceb2cf851dea52.tar.gz sendgrid-nodejs-1dc832b7647d039c07e13ea750ceb2cf851dea52.tar.bz2 |
Add contact-importer helper
Diffstat (limited to 'lib')
-rw-r--r-- | lib/helpers/contact-importer/contact-importer.js | 133 |
1 files changed, 133 insertions, 0 deletions
diff --git a/lib/helpers/contact-importer/contact-importer.js b/lib/helpers/contact-importer/contact-importer.js new file mode 100644 index 0000000..017fa69 --- /dev/null +++ b/lib/helpers/contact-importer/contact-importer.js @@ -0,0 +1,133 @@ +/* eslint dot-notation: 'off' */ +'use strict'; + +var Bottleneck = require('bottleneck'); +var EventEmitter = require('events').EventEmitter; +var chunk = require('lodash.chunk'); +var debug = require('debug')('sendgrid'); +var util = require('util'); +var queue = require('async.queue'); +var ensureAsync = require('async.ensureasync'); + +var ContactImporter = module.exports = function(sg, options) { + options = options || {}; + var self = this; + this.sg = sg; + this.pendingItems = []; + + // Number of items to send per batch. + this.batchSize = options.batchSize || 1500; + + // Max number of requests per rate limit period. + this.rateLimitLimit = options.rateLimitLimit || 3; + + // Length of rate limit period (miliseconds). + this.rateLimitPeriod = options.rateLimitPeriod || 2000; + + // Create a throttler that will process no more than `rateLimitLimit` requests every `rateLimitPeriod` ms. + this.throttle = new Bottleneck(1, this.rateLimitPeriod / this.rateLimitLimit); + + // Create a queue that wil be used to send batches to the throttler. + this.queue = queue(ensureAsync(this._worker)); + + // When the last batch is removed from the queue, add any incomplete batches. + this.queue.empty = function() { + if (self.pendingItems.length) { + debug('adding %s items from deferrd queue for processing', self.pendingItems.length); + var batch = self.pendingItems.splice(0); + self.queue.push({ + data: batch, + owner: self, + }, function(error, result) { + self._notify(error, JSON.parse(result.body), batch); + }); + } + }; + + // Emit an event when the queue is drained. + this.queue.drain = function() { + self.emit('drain'); + }; +}; +util.inherits(ContactImporter, EventEmitter); + +/** + * Add a new contact, or an array of contact, to the end of the queue. + * + * @param {Array|Object} data A contact or array of contacts. + */ +ContactImporter.prototype.push = function(data) { + var self = this; + data = Array.isArray(data) ? data : [data]; + + // Add the new items onto the pending items. + var itemsToProcess = this.pendingItems.concat(data); + + // Chunk the pending items into batches and add onto the queue + var batches = chunk(itemsToProcess, this.batchSize); + debug('generated batches %s from %s items', batches.length, data.length); + + batches.forEach(function(batch) { + // If this batch is full or the queue is empty queue it for processing. + if (batch.length === self.batchSize || !self.queue.length()) { + self.queue.push({ + data: batch, + owner: self, + }, function(error, result) { + self._notify(error, JSON.parse(result.body), batch); + }); + } + // Otherwise, it store it for later. + else { + debug('the last batch with only %s item is deferred (partial batch)', batch.length); + self.pendingItems = batch; + } + }); + + debug('batches in queue: %s', this.queue.length()); + debug('items in deferred queue: %s', this.pendingItems.length); +}; + +/** + * Send a batch of contacts to Sendgrid. + * + * @param {Object} task Task to be processed (data in 'data' property) + * @param {Function} callback Callback function. + */ +ContactImporter.prototype._worker = function(task, callback) { + var context = task.owner; + debug('processing batch (%s items)', task.data.length); + context.throttle.submit(context._sendBatch, context, task.data, callback); +}; + +ContactImporter.prototype._sendBatch = function(context, data, callback) { + debug('sending batch (%s items)', data.length); + + var request = context.sg.emptyRequest(); + request.method = 'POST'; + request.path = '/v3/contactdb/recipients'; + request.body = data; + + context.sg.API(request) + .then(function(response) { + debug('got response: %o', response); + return callback(null, response); + }) + .catch(function(error) { + debug('got error, %o', error); + return callback(error); + }); +}; + +/** + * Emit the result of processing a batch. + * + * @param {Object} error + * @param {Object} result + */ +ContactImporter.prototype._notify = function(error, result, batch) { + if (error) { + return this.emit('error', result, batch); + } + return this.emit('success', result, batch); +}; |