permessage-deflate.js 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512
  1. 'use strict';
  2. const zlib = require('zlib');
  3. const bufferUtil = require('./buffer-util');
  4. const Limiter = require('./limiter');
  5. const { kStatusCode } = require('./constants');
  6. const TRAILER = Buffer.from([0x00, 0x00, 0xff, 0xff]);
  7. const kPerMessageDeflate = Symbol('permessage-deflate');
  8. const kTotalLength = Symbol('total-length');
  9. const kCallback = Symbol('callback');
  10. const kBuffers = Symbol('buffers');
  11. const kError = Symbol('error');
  12. //
  13. // We limit zlib concurrency, which prevents severe memory fragmentation
  14. // as documented in https://github.com/nodejs/node/issues/8871#issuecomment-250915913
  15. // and https://github.com/websockets/ws/issues/1202
  16. //
  17. // Intentionally global; it's the global thread pool that's an issue.
  18. //
  19. let zlibLimiter;
  20. /**
  21. * permessage-deflate implementation.
  22. */
  23. class PerMessageDeflate {
  24. /**
  25. * Creates a PerMessageDeflate instance.
  26. *
  27. * @param {Object} [options] Configuration options
  28. * @param {(Boolean|Number)} [options.clientMaxWindowBits] Advertise support
  29. * for, or request, a custom client window size
  30. * @param {Boolean} [options.clientNoContextTakeover=false] Advertise/
  31. * acknowledge disabling of client context takeover
  32. * @param {Number} [options.concurrencyLimit=10] The number of concurrent
  33. * calls to zlib
  34. * @param {(Boolean|Number)} [options.serverMaxWindowBits] Request/confirm the
  35. * use of a custom server window size
  36. * @param {Boolean} [options.serverNoContextTakeover=false] Request/accept
  37. * disabling of server context takeover
  38. * @param {Number} [options.threshold=1024] Size (in bytes) below which
  39. * messages should not be compressed if context takeover is disabled
  40. * @param {Object} [options.zlibDeflateOptions] Options to pass to zlib on
  41. * deflate
  42. * @param {Object} [options.zlibInflateOptions] Options to pass to zlib on
  43. * inflate
  44. * @param {Boolean} [isServer=false] Create the instance in either server or
  45. * client mode
  46. * @param {Number} [maxPayload=0] The maximum allowed message length
  47. */
  48. constructor(options, isServer, maxPayload) {
  49. this._maxPayload = maxPayload | 0;
  50. this._options = options || {};
  51. this._threshold =
  52. this._options.threshold !== undefined ? this._options.threshold : 1024;
  53. this._isServer = !!isServer;
  54. this._deflate = null;
  55. this._inflate = null;
  56. this.params = null;
  57. if (!zlibLimiter) {
  58. const concurrency =
  59. this._options.concurrencyLimit !== undefined
  60. ? this._options.concurrencyLimit
  61. : 10;
  62. zlibLimiter = new Limiter(concurrency);
  63. }
  64. }
  65. /**
  66. * @type {String}
  67. */
  68. static get extensionName() {
  69. return 'permessage-deflate';
  70. }
  71. /**
  72. * Create an extension negotiation offer.
  73. *
  74. * @return {Object} Extension parameters
  75. * @public
  76. */
  77. offer() {
  78. const params = {};
  79. if (this._options.serverNoContextTakeover) {
  80. params.server_no_context_takeover = true;
  81. }
  82. if (this._options.clientNoContextTakeover) {
  83. params.client_no_context_takeover = true;
  84. }
  85. if (this._options.serverMaxWindowBits) {
  86. params.server_max_window_bits = this._options.serverMaxWindowBits;
  87. }
  88. if (this._options.clientMaxWindowBits) {
  89. params.client_max_window_bits = this._options.clientMaxWindowBits;
  90. } else if (this._options.clientMaxWindowBits == null) {
  91. params.client_max_window_bits = true;
  92. }
  93. return params;
  94. }
  95. /**
  96. * Accept an extension negotiation offer/response.
  97. *
  98. * @param {Array} configurations The extension negotiation offers/reponse
  99. * @return {Object} Accepted configuration
  100. * @public
  101. */
  102. accept(configurations) {
  103. configurations = this.normalizeParams(configurations);
  104. this.params = this._isServer
  105. ? this.acceptAsServer(configurations)
  106. : this.acceptAsClient(configurations);
  107. return this.params;
  108. }
  109. /**
  110. * Releases all resources used by the extension.
  111. *
  112. * @public
  113. */
  114. cleanup() {
  115. if (this._inflate) {
  116. this._inflate.close();
  117. this._inflate = null;
  118. }
  119. if (this._deflate) {
  120. const callback = this._deflate[kCallback];
  121. this._deflate.close();
  122. this._deflate = null;
  123. if (callback) {
  124. callback(
  125. new Error(
  126. 'The deflate stream was closed while data was being processed'
  127. )
  128. );
  129. }
  130. }
  131. }
  132. /**
  133. * Accept an extension negotiation offer.
  134. *
  135. * @param {Array} offers The extension negotiation offers
  136. * @return {Object} Accepted configuration
  137. * @private
  138. */
  139. acceptAsServer(offers) {
  140. const opts = this._options;
  141. const accepted = offers.find((params) => {
  142. if (
  143. (opts.serverNoContextTakeover === false &&
  144. params.server_no_context_takeover) ||
  145. (params.server_max_window_bits &&
  146. (opts.serverMaxWindowBits === false ||
  147. (typeof opts.serverMaxWindowBits === 'number' &&
  148. opts.serverMaxWindowBits > params.server_max_window_bits))) ||
  149. (typeof opts.clientMaxWindowBits === 'number' &&
  150. !params.client_max_window_bits)
  151. ) {
  152. return false;
  153. }
  154. return true;
  155. });
  156. if (!accepted) {
  157. throw new Error('None of the extension offers can be accepted');
  158. }
  159. if (opts.serverNoContextTakeover) {
  160. accepted.server_no_context_takeover = true;
  161. }
  162. if (opts.clientNoContextTakeover) {
  163. accepted.client_no_context_takeover = true;
  164. }
  165. if (typeof opts.serverMaxWindowBits === 'number') {
  166. accepted.server_max_window_bits = opts.serverMaxWindowBits;
  167. }
  168. if (typeof opts.clientMaxWindowBits === 'number') {
  169. accepted.client_max_window_bits = opts.clientMaxWindowBits;
  170. } else if (
  171. accepted.client_max_window_bits === true ||
  172. opts.clientMaxWindowBits === false
  173. ) {
  174. delete accepted.client_max_window_bits;
  175. }
  176. return accepted;
  177. }
  178. /**
  179. * Accept the extension negotiation response.
  180. *
  181. * @param {Array} response The extension negotiation response
  182. * @return {Object} Accepted configuration
  183. * @private
  184. */
  185. acceptAsClient(response) {
  186. const params = response[0];
  187. if (
  188. this._options.clientNoContextTakeover === false &&
  189. params.client_no_context_takeover
  190. ) {
  191. throw new Error('Unexpected parameter "client_no_context_takeover"');
  192. }
  193. if (!params.client_max_window_bits) {
  194. if (typeof this._options.clientMaxWindowBits === 'number') {
  195. params.client_max_window_bits = this._options.clientMaxWindowBits;
  196. }
  197. } else if (
  198. this._options.clientMaxWindowBits === false ||
  199. (typeof this._options.clientMaxWindowBits === 'number' &&
  200. params.client_max_window_bits > this._options.clientMaxWindowBits)
  201. ) {
  202. throw new Error(
  203. 'Unexpected or invalid parameter "client_max_window_bits"'
  204. );
  205. }
  206. return params;
  207. }
  208. /**
  209. * Normalize parameters.
  210. *
  211. * @param {Array} configurations The extension negotiation offers/reponse
  212. * @return {Array} The offers/response with normalized parameters
  213. * @private
  214. */
  215. normalizeParams(configurations) {
  216. configurations.forEach((params) => {
  217. Object.keys(params).forEach((key) => {
  218. let value = params[key];
  219. if (value.length > 1) {
  220. throw new Error(`Parameter "${key}" must have only a single value`);
  221. }
  222. value = value[0];
  223. if (key === 'client_max_window_bits') {
  224. if (value !== true) {
  225. const num = +value;
  226. if (!Number.isInteger(num) || num < 8 || num > 15) {
  227. throw new TypeError(
  228. `Invalid value for parameter "${key}": ${value}`
  229. );
  230. }
  231. value = num;
  232. } else if (!this._isServer) {
  233. throw new TypeError(
  234. `Invalid value for parameter "${key}": ${value}`
  235. );
  236. }
  237. } else if (key === 'server_max_window_bits') {
  238. const num = +value;
  239. if (!Number.isInteger(num) || num < 8 || num > 15) {
  240. throw new TypeError(
  241. `Invalid value for parameter "${key}": ${value}`
  242. );
  243. }
  244. value = num;
  245. } else if (
  246. key === 'client_no_context_takeover' ||
  247. key === 'server_no_context_takeover'
  248. ) {
  249. if (value !== true) {
  250. throw new TypeError(
  251. `Invalid value for parameter "${key}": ${value}`
  252. );
  253. }
  254. } else {
  255. throw new Error(`Unknown parameter "${key}"`);
  256. }
  257. params[key] = value;
  258. });
  259. });
  260. return configurations;
  261. }
  262. /**
  263. * Decompress data. Concurrency limited.
  264. *
  265. * @param {Buffer} data Compressed data
  266. * @param {Boolean} fin Specifies whether or not this is the last fragment
  267. * @param {Function} callback Callback
  268. * @public
  269. */
  270. decompress(data, fin, callback) {
  271. zlibLimiter.add((done) => {
  272. this._decompress(data, fin, (err, result) => {
  273. done();
  274. callback(err, result);
  275. });
  276. });
  277. }
  278. /**
  279. * Compress data. Concurrency limited.
  280. *
  281. * @param {(Buffer|String)} data Data to compress
  282. * @param {Boolean} fin Specifies whether or not this is the last fragment
  283. * @param {Function} callback Callback
  284. * @public
  285. */
  286. compress(data, fin, callback) {
  287. zlibLimiter.add((done) => {
  288. this._compress(data, fin, (err, result) => {
  289. done();
  290. callback(err, result);
  291. });
  292. });
  293. }
  294. /**
  295. * Decompress data.
  296. *
  297. * @param {Buffer} data Compressed data
  298. * @param {Boolean} fin Specifies whether or not this is the last fragment
  299. * @param {Function} callback Callback
  300. * @private
  301. */
  302. _decompress(data, fin, callback) {
  303. const endpoint = this._isServer ? 'client' : 'server';
  304. if (!this._inflate) {
  305. const key = `${endpoint}_max_window_bits`;
  306. const windowBits =
  307. typeof this.params[key] !== 'number'
  308. ? zlib.Z_DEFAULT_WINDOWBITS
  309. : this.params[key];
  310. this._inflate = zlib.createInflateRaw({
  311. ...this._options.zlibInflateOptions,
  312. windowBits
  313. });
  314. this._inflate[kPerMessageDeflate] = this;
  315. this._inflate[kTotalLength] = 0;
  316. this._inflate[kBuffers] = [];
  317. this._inflate.on('error', inflateOnError);
  318. this._inflate.on('data', inflateOnData);
  319. }
  320. this._inflate[kCallback] = callback;
  321. this._inflate.write(data);
  322. if (fin) this._inflate.write(TRAILER);
  323. this._inflate.flush(() => {
  324. const err = this._inflate[kError];
  325. if (err) {
  326. this._inflate.close();
  327. this._inflate = null;
  328. callback(err);
  329. return;
  330. }
  331. const data = bufferUtil.concat(
  332. this._inflate[kBuffers],
  333. this._inflate[kTotalLength]
  334. );
  335. if (this._inflate._readableState.endEmitted) {
  336. this._inflate.close();
  337. this._inflate = null;
  338. } else {
  339. this._inflate[kTotalLength] = 0;
  340. this._inflate[kBuffers] = [];
  341. if (fin && this.params[`${endpoint}_no_context_takeover`]) {
  342. this._inflate.reset();
  343. }
  344. }
  345. callback(null, data);
  346. });
  347. }
  348. /**
  349. * Compress data.
  350. *
  351. * @param {(Buffer|String)} data Data to compress
  352. * @param {Boolean} fin Specifies whether or not this is the last fragment
  353. * @param {Function} callback Callback
  354. * @private
  355. */
  356. _compress(data, fin, callback) {
  357. const endpoint = this._isServer ? 'server' : 'client';
  358. if (!this._deflate) {
  359. const key = `${endpoint}_max_window_bits`;
  360. const windowBits =
  361. typeof this.params[key] !== 'number'
  362. ? zlib.Z_DEFAULT_WINDOWBITS
  363. : this.params[key];
  364. this._deflate = zlib.createDeflateRaw({
  365. ...this._options.zlibDeflateOptions,
  366. windowBits
  367. });
  368. this._deflate[kTotalLength] = 0;
  369. this._deflate[kBuffers] = [];
  370. this._deflate.on('data', deflateOnData);
  371. }
  372. this._deflate[kCallback] = callback;
  373. this._deflate.write(data);
  374. this._deflate.flush(zlib.Z_SYNC_FLUSH, () => {
  375. if (!this._deflate) {
  376. //
  377. // The deflate stream was closed while data was being processed.
  378. //
  379. return;
  380. }
  381. let data = bufferUtil.concat(
  382. this._deflate[kBuffers],
  383. this._deflate[kTotalLength]
  384. );
  385. if (fin) data = data.slice(0, data.length - 4);
  386. //
  387. // Ensure that the callback will not be called again in
  388. // `PerMessageDeflate#cleanup()`.
  389. //
  390. this._deflate[kCallback] = null;
  391. this._deflate[kTotalLength] = 0;
  392. this._deflate[kBuffers] = [];
  393. if (fin && this.params[`${endpoint}_no_context_takeover`]) {
  394. this._deflate.reset();
  395. }
  396. callback(null, data);
  397. });
  398. }
  399. }
  400. module.exports = PerMessageDeflate;
  401. /**
  402. * The listener of the `zlib.DeflateRaw` stream `'data'` event.
  403. *
  404. * @param {Buffer} chunk A chunk of data
  405. * @private
  406. */
  407. function deflateOnData(chunk) {
  408. this[kBuffers].push(chunk);
  409. this[kTotalLength] += chunk.length;
  410. }
  411. /**
  412. * The listener of the `zlib.InflateRaw` stream `'data'` event.
  413. *
  414. * @param {Buffer} chunk A chunk of data
  415. * @private
  416. */
  417. function inflateOnData(chunk) {
  418. this[kTotalLength] += chunk.length;
  419. if (
  420. this[kPerMessageDeflate]._maxPayload < 1 ||
  421. this[kTotalLength] <= this[kPerMessageDeflate]._maxPayload
  422. ) {
  423. this[kBuffers].push(chunk);
  424. return;
  425. }
  426. this[kError] = new RangeError('Max payload size exceeded');
  427. this[kError].code = 'WS_ERR_UNSUPPORTED_MESSAGE_LENGTH';
  428. this[kError][kStatusCode] = 1009;
  429. this.removeListener('data', inflateOnData);
  430. this.reset();
  431. }
  432. /**
  433. * The listener of the `zlib.InflateRaw` stream `'error'` event.
  434. *
  435. * @param {Error} err The emitted error
  436. * @private
  437. */
  438. function inflateOnError(err) {
  439. //
  440. // There is no need to call `Zlib#close()` as the handle is automatically
  441. // closed when an error is emitted.
  442. //
  443. this[kPerMessageDeflate]._inflate = null;
  444. err[kStatusCode] = 1007;
  445. this[kCallback](err);
  446. }