stream-utils.js 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
  4. "use strict";
  5. const { Ci, Cc, Cu, Cr, CC } = require("chrome");
  6. const Services = require("Services");
  7. const DevToolsUtils = require("devtools/shared/DevToolsUtils");
  8. const { dumpv } = DevToolsUtils;
  9. const EventEmitter = require("devtools/shared/event-emitter");
  10. const promise = require("promise");
  11. const defer = require("devtools/shared/defer");
  12. DevToolsUtils.defineLazyGetter(this, "IOUtil", () => {
  13. return Cc["@mozilla.org/io-util;1"].getService(Ci.nsIIOUtil);
  14. });
  15. DevToolsUtils.defineLazyGetter(this, "ScriptableInputStream", () => {
  16. return CC("@mozilla.org/scriptableinputstream;1",
  17. "nsIScriptableInputStream", "init");
  18. });
  19. const BUFFER_SIZE = 0x8000;
  20. /**
  21. * This helper function (and its companion object) are used by bulk senders and
  22. * receivers to read and write data in and out of other streams. Functions that
  23. * make use of this tool are passed to callers when it is time to read or write
  24. * bulk data. It is highly recommended to use these copier functions instead of
  25. * the stream directly because the copier enforces the agreed upon length.
  26. * Since bulk mode reuses an existing stream, the sender and receiver must write
  27. * and read exactly the agreed upon amount of data, or else the entire transport
  28. * will be left in a invalid state. Additionally, other methods of stream
  29. * copying (such as NetUtil.asyncCopy) close the streams involved, which would
  30. * terminate the debugging transport, and so it is avoided here.
  31. *
  32. * Overall, this *works*, but clearly the optimal solution would be able to just
  33. * use the streams directly. If it were possible to fully implement
  34. * nsIInputStream / nsIOutputStream in JS, wrapper streams could be created to
  35. * enforce the length and avoid closing, and consumers could use familiar stream
  36. * utilities like NetUtil.asyncCopy.
  37. *
  38. * The function takes two async streams and copies a precise number of bytes
  39. * from one to the other. Copying begins immediately, but may complete at some
  40. * future time depending on data size. Use the returned promise to know when
  41. * it's complete.
  42. *
  43. * @param input nsIAsyncInputStream
  44. * The stream to copy from.
  45. * @param output nsIAsyncOutputStream
  46. * The stream to copy to.
  47. * @param length Integer
  48. * The amount of data that needs to be copied.
  49. * @return Promise
  50. * The promise is resolved when copying completes or rejected if any
  51. * (unexpected) errors occur.
  52. */
  53. function copyStream(input, output, length) {
  54. let copier = new StreamCopier(input, output, length);
  55. return copier.copy();
  56. }
  57. function StreamCopier(input, output, length) {
  58. EventEmitter.decorate(this);
  59. this._id = StreamCopier._nextId++;
  60. this.input = input;
  61. // Save off the base output stream, since we know it's async as we've required
  62. this.baseAsyncOutput = output;
  63. if (IOUtil.outputStreamIsBuffered(output)) {
  64. this.output = output;
  65. } else {
  66. this.output = Cc["@mozilla.org/network/buffered-output-stream;1"].
  67. createInstance(Ci.nsIBufferedOutputStream);
  68. this.output.init(output, BUFFER_SIZE);
  69. }
  70. this._length = length;
  71. this._amountLeft = length;
  72. this._deferred = defer();
  73. this._copy = this._copy.bind(this);
  74. this._flush = this._flush.bind(this);
  75. this._destroy = this._destroy.bind(this);
  76. // Copy promise's then method up to this object.
  77. // Allows the copier to offer a promise interface for the simple succeed or
  78. // fail scenarios, but also emit events (due to the EventEmitter) for other
  79. // states, like progress.
  80. this.then = this._deferred.promise.then.bind(this._deferred.promise);
  81. this.then(this._destroy, this._destroy);
  82. // Stream ready callback starts as |_copy|, but may switch to |_flush| at end
  83. // if flushing would block the output stream.
  84. this._streamReadyCallback = this._copy;
  85. }
  86. StreamCopier._nextId = 0;
  87. StreamCopier.prototype = {
  88. copy: function () {
  89. // Dispatch to the next tick so that it's possible to attach a progress
  90. // event listener, even for extremely fast copies (like when testing).
  91. Services.tm.currentThread.dispatch(() => {
  92. try {
  93. this._copy();
  94. } catch (e) {
  95. this._deferred.reject(e);
  96. }
  97. }, 0);
  98. return this;
  99. },
  100. _copy: function () {
  101. let bytesAvailable = this.input.available();
  102. let amountToCopy = Math.min(bytesAvailable, this._amountLeft);
  103. this._debug("Trying to copy: " + amountToCopy);
  104. let bytesCopied;
  105. try {
  106. bytesCopied = this.output.writeFrom(this.input, amountToCopy);
  107. } catch (e) {
  108. if (e.result == Cr.NS_BASE_STREAM_WOULD_BLOCK) {
  109. this._debug("Base stream would block, will retry");
  110. this._debug("Waiting for output stream");
  111. this.baseAsyncOutput.asyncWait(this, 0, 0, Services.tm.currentThread);
  112. return;
  113. } else {
  114. throw e;
  115. }
  116. }
  117. this._amountLeft -= bytesCopied;
  118. this._debug("Copied: " + bytesCopied +
  119. ", Left: " + this._amountLeft);
  120. this._emitProgress();
  121. if (this._amountLeft === 0) {
  122. this._debug("Copy done!");
  123. this._flush();
  124. return;
  125. }
  126. this._debug("Waiting for input stream");
  127. this.input.asyncWait(this, 0, 0, Services.tm.currentThread);
  128. },
  129. _emitProgress: function () {
  130. this.emit("progress", {
  131. bytesSent: this._length - this._amountLeft,
  132. totalBytes: this._length
  133. });
  134. },
  135. _flush: function () {
  136. try {
  137. this.output.flush();
  138. } catch (e) {
  139. if (e.result == Cr.NS_BASE_STREAM_WOULD_BLOCK ||
  140. e.result == Cr.NS_ERROR_FAILURE) {
  141. this._debug("Flush would block, will retry");
  142. this._streamReadyCallback = this._flush;
  143. this._debug("Waiting for output stream");
  144. this.baseAsyncOutput.asyncWait(this, 0, 0, Services.tm.currentThread);
  145. return;
  146. } else {
  147. throw e;
  148. }
  149. }
  150. this._deferred.resolve();
  151. },
  152. _destroy: function () {
  153. this._destroy = null;
  154. this._copy = null;
  155. this._flush = null;
  156. this.input = null;
  157. this.output = null;
  158. },
  159. // nsIInputStreamCallback
  160. onInputStreamReady: function () {
  161. this._streamReadyCallback();
  162. },
  163. // nsIOutputStreamCallback
  164. onOutputStreamReady: function () {
  165. this._streamReadyCallback();
  166. },
  167. _debug: function (msg) {
  168. // Prefix logs with the copier ID, which makes logs much easier to
  169. // understand when several copiers are running simultaneously
  170. dumpv("Copier: " + this._id + " " + msg);
  171. }
  172. };
  173. /**
  174. * Read from a stream, one byte at a time, up to the next |delimiter|
  175. * character, but stopping if we've read |count| without finding it. Reading
  176. * also terminates early if there are less than |count| bytes available on the
  177. * stream. In that case, we only read as many bytes as the stream currently has
  178. * to offer.
  179. * TODO: This implementation could be removed if bug 984651 is fixed, which
  180. * provides a native version of the same idea.
  181. * @param stream nsIInputStream
  182. * The input stream to read from.
  183. * @param delimiter string
  184. * The character we're trying to find.
  185. * @param count integer
  186. * The max number of characters to read while searching.
  187. * @return string
  188. * The data collected. If the delimiter was found, this string will
  189. * end with it.
  190. */
  191. function delimitedRead(stream, delimiter, count) {
  192. dumpv("Starting delimited read for " + delimiter + " up to " +
  193. count + " bytes");
  194. let scriptableStream;
  195. if (stream instanceof Ci.nsIScriptableInputStream) {
  196. scriptableStream = stream;
  197. } else {
  198. scriptableStream = new ScriptableInputStream(stream);
  199. }
  200. let data = "";
  201. // Don't exceed what's available on the stream
  202. count = Math.min(count, stream.available());
  203. if (count <= 0) {
  204. return data;
  205. }
  206. let char;
  207. while (char !== delimiter && count > 0) {
  208. char = scriptableStream.readBytes(1);
  209. count--;
  210. data += char;
  211. }
  212. return data;
  213. }
  214. module.exports = {
  215. copyStream: copyStream,
  216. delimitedRead: delimitedRead
  217. };