_http_agent.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417
  1. // Copyright Joyent, Inc. and other Node contributors.
  2. //
  3. // Permission is hereby granted, free of charge, to any person obtaining a
  4. // copy of this software and associated documentation files (the
  5. // "Software"), to deal in the Software without restriction, including
  6. // without limitation the rights to use, copy, modify, merge, publish,
  7. // distribute, sublicense, and/or sell copies of the Software, and to permit
  8. // persons to whom the Software is furnished to do so, subject to the
  9. // following conditions:
  10. //
  11. // The above copyright notice and this permission notice shall be included
  12. // in all copies or substantial portions of the Software.
  13. //
  14. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
  15. // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  16. // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
  17. // NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
  18. // DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
  19. // OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
  20. // USE OR OTHER DEALINGS IN THE SOFTWARE.
  21. // patch from https://github.com/nodejs/node/blob/v7.2.1/lib/_http_agent.js
  22. 'use strict';
  23. const net = require('net');
  24. const util = require('util');
  25. const EventEmitter = require('events');
  26. const debug = util.debuglog('http');
  27. // New Agent code.
  28. // The largest departure from the previous implementation is that
  29. // an Agent instance holds connections for a variable number of host:ports.
  30. // Surprisingly, this is still API compatible as far as third parties are
  31. // concerned. The only code that really notices the difference is the
  32. // request object.
  33. // Another departure is that all code related to HTTP parsing is in
  34. // ClientRequest.onSocket(). The Agent is now *strictly*
  35. // concerned with managing a connection pool.
  36. function Agent(options) {
  37. if (!(this instanceof Agent))
  38. return new Agent(options);
  39. EventEmitter.call(this);
  40. var self = this;
  41. self.defaultPort = 80;
  42. self.protocol = 'http:';
  43. self.options = util._extend({}, options);
  44. // don't confuse net and make it think that we're connecting to a pipe
  45. self.options.path = null;
  46. self.requests = {};
  47. self.sockets = {};
  48. self.freeSockets = {};
  49. self.keepAliveMsecs = self.options.keepAliveMsecs || 1000;
  50. self.keepAlive = self.options.keepAlive || false;
  51. self.maxSockets = self.options.maxSockets || Agent.defaultMaxSockets;
  52. self.maxFreeSockets = self.options.maxFreeSockets || 256;
  53. // [patch start]
  54. // free keep-alive socket timeout. By default free socket do not have a timeout.
  55. self.freeSocketKeepAliveTimeout = self.options.freeSocketKeepAliveTimeout || 0;
  56. // working socket timeout. By default working socket do not have a timeout.
  57. self.timeout = self.options.timeout || 0;
  58. // the socket active time to live, even if it's in use
  59. this.socketActiveTTL = this.options.socketActiveTTL || null;
  60. // [patch end]
  61. self.on('free', function(socket, options) {
  62. var name = self.getName(options);
  63. debug('agent.on(free)', name);
  64. if (socket.writable &&
  65. self.requests[name] && self.requests[name].length) {
  66. // [patch start]
  67. debug('continue handle next request');
  68. // [patch end]
  69. self.requests[name].shift().onSocket(socket);
  70. if (self.requests[name].length === 0) {
  71. // don't leak
  72. delete self.requests[name];
  73. }
  74. } else {
  75. // If there are no pending requests, then put it in
  76. // the freeSockets pool, but only if we're allowed to do so.
  77. var req = socket._httpMessage;
  78. if (req &&
  79. req.shouldKeepAlive &&
  80. socket.writable &&
  81. self.keepAlive) {
  82. var freeSockets = self.freeSockets[name];
  83. var freeLen = freeSockets ? freeSockets.length : 0;
  84. var count = freeLen;
  85. if (self.sockets[name])
  86. count += self.sockets[name].length;
  87. if (count > self.maxSockets || freeLen >= self.maxFreeSockets) {
  88. socket.destroy();
  89. } else {
  90. freeSockets = freeSockets || [];
  91. self.freeSockets[name] = freeSockets;
  92. socket.setKeepAlive(true, self.keepAliveMsecs);
  93. socket.unref();
  94. socket._httpMessage = null;
  95. self.removeSocket(socket, options);
  96. freeSockets.push(socket);
  97. // [patch start]
  98. // Add a default error handler to avoid Unhandled 'error' event throw on idle socket
  99. // https://github.com/node-modules/agentkeepalive/issues/25
  100. // https://github.com/nodejs/node/pull/4482 (fixed in >= 4.4.0 and >= 5.4.0)
  101. if (socket.listeners('error').length === 0) {
  102. socket.once('error', freeSocketErrorListener);
  103. }
  104. // set free keepalive timer
  105. // try to use socket custom freeSocketKeepAliveTimeout first
  106. const freeSocketKeepAliveTimeout = socket.freeSocketKeepAliveTimeout || self.freeSocketKeepAliveTimeout;
  107. socket.setTimeout(freeSocketKeepAliveTimeout);
  108. debug(`push to free socket queue and wait for ${freeSocketKeepAliveTimeout}ms`);
  109. // [patch end]
  110. }
  111. } else {
  112. socket.destroy();
  113. }
  114. }
  115. });
  116. }
  117. util.inherits(Agent, EventEmitter);
  118. exports.Agent = Agent;
  119. // [patch start]
  120. function freeSocketErrorListener(err) {
  121. var socket = this;
  122. debug('SOCKET ERROR on FREE socket:', err.message, err.stack);
  123. socket.destroy();
  124. socket.emit('agentRemove');
  125. }
  126. // [patch end]
  127. Agent.defaultMaxSockets = Infinity;
  128. Agent.prototype.createConnection = net.createConnection;
  129. // Get the key for a given set of request options
  130. Agent.prototype.getName = function getName(options) {
  131. var name = options.host || 'localhost';
  132. name += ':';
  133. if (options.port)
  134. name += options.port;
  135. name += ':';
  136. if (options.localAddress)
  137. name += options.localAddress;
  138. // Pacify parallel/test-http-agent-getname by only appending
  139. // the ':' when options.family is set.
  140. if (options.family === 4 || options.family === 6)
  141. name += ':' + options.family;
  142. return name;
  143. };
  144. // [patch start]
  145. function handleSocketCreation(req) {
  146. return function(err, newSocket) {
  147. if (err) {
  148. process.nextTick(function() {
  149. req.emit('error', err);
  150. });
  151. return;
  152. }
  153. req.onSocket(newSocket);
  154. }
  155. }
  156. // [patch end]
  157. Agent.prototype.addRequest = function addRequest(req, options, port/*legacy*/,
  158. localAddress/*legacy*/) {
  159. // Legacy API: addRequest(req, host, port, localAddress)
  160. if (typeof options === 'string') {
  161. options = {
  162. host: options,
  163. port,
  164. localAddress
  165. };
  166. }
  167. options = util._extend({}, options);
  168. options = util._extend(options, this.options);
  169. if (!options.servername)
  170. options.servername = calculateServerName(options, req);
  171. var name = this.getName(options);
  172. if (!this.sockets[name]) {
  173. this.sockets[name] = [];
  174. }
  175. var freeLen = this.freeSockets[name] ? this.freeSockets[name].length : 0;
  176. var sockLen = freeLen + this.sockets[name].length;
  177. if (freeLen) {
  178. // we have a free socket, so use that.
  179. var socket = this.freeSockets[name].shift();
  180. debug('have free socket');
  181. // [patch start]
  182. // remove free socket error event handler
  183. socket.removeListener('error', freeSocketErrorListener);
  184. // restart the default timer
  185. socket.setTimeout(this.timeout);
  186. if (this.socketActiveTTL && Date.now() - socket.createdTime > this.socketActiveTTL) {
  187. debug(`socket ${socket.createdTime} expired`);
  188. socket.destroy();
  189. return this.createSocket(req, options, handleSocketCreation(req));
  190. }
  191. // [patch end]
  192. // don't leak
  193. if (!this.freeSockets[name].length)
  194. delete this.freeSockets[name];
  195. socket.ref();
  196. req.onSocket(socket);
  197. this.sockets[name].push(socket);
  198. } else if (sockLen < this.maxSockets) {
  199. debug('call onSocket', sockLen, freeLen);
  200. // If we are under maxSockets create a new one.
  201. // [patch start]
  202. this.createSocket(req, options, handleSocketCreation(req));
  203. // [patch end]
  204. } else {
  205. debug('wait for socket');
  206. // We are over limit so we'll add it to the queue.
  207. if (!this.requests[name]) {
  208. this.requests[name] = [];
  209. }
  210. this.requests[name].push(req);
  211. }
  212. };
  213. Agent.prototype.createSocket = function createSocket(req, options, cb) {
  214. var self = this;
  215. options = util._extend({}, options);
  216. options = util._extend(options, self.options);
  217. if (!options.servername)
  218. options.servername = calculateServerName(options, req);
  219. var name = self.getName(options);
  220. options._agentKey = name;
  221. debug('createConnection', name, options);
  222. options.encoding = null;
  223. var called = false;
  224. const newSocket = self.createConnection(options, oncreate);
  225. // [patch start]
  226. if (newSocket) {
  227. oncreate(null, Object.assign(newSocket, { createdTime: Date.now() }));
  228. }
  229. // [patch end]
  230. function oncreate(err, s) {
  231. if (called)
  232. return;
  233. called = true;
  234. if (err)
  235. return cb(err);
  236. if (!self.sockets[name]) {
  237. self.sockets[name] = [];
  238. }
  239. self.sockets[name].push(s);
  240. debug('sockets', name, self.sockets[name].length);
  241. function onFree() {
  242. self.emit('free', s, options);
  243. }
  244. s.on('free', onFree);
  245. function onClose(err) {
  246. debug('CLIENT socket onClose');
  247. // This is the only place where sockets get removed from the Agent.
  248. // If you want to remove a socket from the pool, just close it.
  249. // All socket errors end in a close event anyway.
  250. self.removeSocket(s, options);
  251. // [patch start]
  252. self.emit('close');
  253. // [patch end]
  254. }
  255. s.on('close', onClose);
  256. // [patch start]
  257. // start socket timeout handler
  258. function onTimeout() {
  259. debug('CLIENT socket onTimeout');
  260. s.destroy();
  261. // Remove it from freeSockets immediately to prevent new requests from being sent through this socket.
  262. self.removeSocket(s, options);
  263. self.emit('timeout');
  264. }
  265. s.on('timeout', onTimeout);
  266. // set the default timer
  267. s.setTimeout(self.timeout);
  268. // [patch end]
  269. function onRemove() {
  270. // We need this function for cases like HTTP 'upgrade'
  271. // (defined by WebSockets) where we need to remove a socket from the
  272. // pool because it'll be locked up indefinitely
  273. debug('CLIENT socket onRemove');
  274. self.removeSocket(s, options);
  275. s.removeListener('close', onClose);
  276. s.removeListener('free', onFree);
  277. s.removeListener('agentRemove', onRemove);
  278. // [patch start]
  279. // remove socket timeout handler
  280. s.setTimeout(0, onTimeout);
  281. // [patch end]
  282. }
  283. s.on('agentRemove', onRemove);
  284. cb(null, s);
  285. }
  286. };
  287. function calculateServerName(options, req) {
  288. let servername = options.host;
  289. const hostHeader = req.getHeader('host');
  290. if (hostHeader) {
  291. // abc => abc
  292. // abc:123 => abc
  293. // [::1] => ::1
  294. // [::1]:123 => ::1
  295. if (hostHeader.startsWith('[')) {
  296. const index = hostHeader.indexOf(']');
  297. if (index === -1) {
  298. // Leading '[', but no ']'. Need to do something...
  299. servername = hostHeader;
  300. } else {
  301. servername = hostHeader.substr(1, index - 1);
  302. }
  303. } else {
  304. servername = hostHeader.split(':', 1)[0];
  305. }
  306. }
  307. return servername;
  308. }
  309. Agent.prototype.removeSocket = function removeSocket(s, options) {
  310. var name = this.getName(options);
  311. debug('removeSocket', name, 'writable:', s.writable);
  312. var sets = [this.sockets];
  313. // If the socket was destroyed, remove it from the free buffers too.
  314. if (!s.writable)
  315. sets.push(this.freeSockets);
  316. for (var sk = 0; sk < sets.length; sk++) {
  317. var sockets = sets[sk];
  318. if (sockets[name]) {
  319. var index = sockets[name].indexOf(s);
  320. if (index !== -1) {
  321. sockets[name].splice(index, 1);
  322. // Don't leak
  323. if (sockets[name].length === 0)
  324. delete sockets[name];
  325. }
  326. }
  327. }
  328. // [patch start]
  329. var freeLen = this.freeSockets[name] ? this.freeSockets[name].length : 0;
  330. var sockLen = freeLen + this.sockets[name] ? this.sockets[name].length : 0;
  331. // [patch end]
  332. if (this.requests[name] && this.requests[name].length && sockLen < this.maxSockets) {
  333. debug('removeSocket, have a request, make a socket');
  334. var req = this.requests[name][0];
  335. // If we have pending requests and a socket gets closed make a new one
  336. this.createSocket(req, options, function(err, newSocket) {
  337. if (err) {
  338. process.nextTick(function() {
  339. req.emit('error', err);
  340. });
  341. return;
  342. }
  343. newSocket.emit('free');
  344. });
  345. }
  346. };
  347. Agent.prototype.destroy = function destroy() {
  348. var sets = [this.freeSockets, this.sockets];
  349. for (var s = 0; s < sets.length; s++) {
  350. var set = sets[s];
  351. var keys = Object.keys(set);
  352. for (var v = 0; v < keys.length; v++) {
  353. var setName = set[keys[v]];
  354. for (var n = 0; n < setName.length; n++) {
  355. setName[n].destroy();
  356. }
  357. }
  358. }
  359. };
  360. exports.globalAgent = new Agent();