All files / Nodejs/lib/template api-request.js

0% Statements 0/65
0% Branches 0/18
0% Functions 0/15
0% Lines 0/63

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147                                                                                                                                                                                                                                                                                                     
var http = require("http");
var https = require("https");
var URL = require("url");
var extend = require("extend");
var async = require("async");
var moment = require("moment");
var streams = require("../streams");
 
module.exports = function (opts) {
	if (typeof opts.batch == "number") {
		opts.batch = {
			count: opts.batch
		};
	}
	opts = extend(true, {
		batch: {
			count: 40,
			time: {
				seconds: 10
			}
		},
		retries: 3,
		backoff: function (retryCount) {
			return {
				seconds: Math.pow(2, retryCount)
			}
		}
	}, opts);
 
	return streams.pipeline(
		streams.batch(opts.batch),
		streams.through(function (event, done) {
			var responses = [];
			var track = {
				lastKey: -1,
				count: 0
			};
 
			let payload = event.payload;
 
			if (!Array.isArray(payload)) {
				payload = meta;
			}
 
			let tryEmit = (callback) => {
				var start = track.lastKey + 1;
				for (var i = start; i < responses.length; i++) {
					var kn = responses[i];
					if (kn == undefined) {
						break;
					}
					this.push(Object.assign({}, payload[i], {
						payload: kn
					}));
					track.lastKey = i
					delete responses[i];
				}
				callback();
			};
 
			let api = opts.api || doAPI;
			async.eachOfLimit(payload, payload.length, (event, key, singleDone) => {
 
				let errorCount = 0;
 
				function makeRequest() {
					api.call({
						event: event,
						api: doAPI
					}, opts.url, Object.assign({}, event.payload), {
						headers: opts.headers
					}, opts.request).then(response => {
						responses[key] = response;
						tryEmit(singleDone)
					}).catch(err => {
						errorCount++;
						if (errorCount <= opts.retries) {
							let backoff = moment.duration(opts.backoff(errorCount));
							console.log(event.eid, "Backing Off", backoff.toString())
							setTimeout(makeRequest, Math.min(2147483647, backoff.asMilliseconds())); // Max value allowed by setTimeout 
						} else {
							console.log(event.eid, "No more retires")
							singleDone(err);
						}
					});
				}
 
				makeRequest();
			}, (err, result) => {
				if (err) {
					console.log("Error with batch", err);
					done(err);
				} else {
					tryEmit(done);
				}
			});
 
		})
	)
};
 
 
function doAPI(url, data, opts) {
	var api = http;
	if (url.match(/^https/)) {
		api = https;
	}
 
	opts = Object.assign(URL.parse(url), extend(true, {
		method: "POST",
		headers: {
			'Content-Type': 'application/json',
		},
		timeout: 1000 * 60 * 4
	}, opts));
 
	//console.log("Sending Request", url)
	return new Promise((resolve, reject) => {
		var req = api.request(opts, (res) => {
			var result = "";
			res.on("data", (chunk) => result += chunk);
			res.on("end", () => {
				try {
					var r = JSON.parse(result);
					resolve(r)
				} catch (err) {
					//console.log("Rejecting", err)
					reject({
						response: result,
						message: err.message
					});
				}
			});
		}).on("error", (err) => {
			//console.log("Rejecting", err)
			reject({
				message: err.message
			});
		});
 
		if (opts.method === "POST" && data != undefined) {
			req.write(JSON.stringify(data));
		}
		req.end();
	});
 
};