graceful-fs.js 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449
  1. var fs = require('fs')
  2. var polyfills = require('./polyfills.js')
  3. var legacy = require('./legacy-streams.js')
  4. var clone = require('./clone.js')
  5. var util = require('util')
  6. /* istanbul ignore next - node 0.x polyfill */
  7. var gracefulQueue
  8. var previousSymbol
  9. /* istanbul ignore else - node 0.x polyfill */
  10. if (typeof Symbol === 'function' && typeof Symbol.for === 'function') {
  11. gracefulQueue = Symbol.for('graceful-fs.queue')
  12. // This is used in testing by future versions
  13. previousSymbol = Symbol.for('graceful-fs.previous')
  14. } else {
  15. gracefulQueue = '___graceful-fs.queue'
  16. previousSymbol = '___graceful-fs.previous'
  17. }
  18. function noop () {}
  19. function publishQueue(context, queue) {
  20. Object.defineProperty(context, gracefulQueue, {
  21. get: function() {
  22. return queue
  23. }
  24. })
  25. }
  26. var debug = noop
  27. if (util.debuglog)
  28. debug = util.debuglog('gfs4')
  29. else if (/\bgfs4\b/i.test(process.env.NODE_DEBUG || ''))
  30. debug = function() {
  31. var m = util.format.apply(util, arguments)
  32. m = 'GFS4: ' + m.split(/\n/).join('\nGFS4: ')
  33. console.error(m)
  34. }
  35. // Once time initialization
  36. if (!fs[gracefulQueue]) {
  37. // This queue can be shared by multiple loaded instances
  38. var queue = global[gracefulQueue] || []
  39. publishQueue(fs, queue)
  40. // Patch fs.close/closeSync to shared queue version, because we need
  41. // to retry() whenever a close happens *anywhere* in the program.
  42. // This is essential when multiple graceful-fs instances are
  43. // in play at the same time.
  44. fs.close = (function (fs$close) {
  45. function close (fd, cb) {
  46. return fs$close.call(fs, fd, function (err) {
  47. // This function uses the graceful-fs shared queue
  48. if (!err) {
  49. resetQueue()
  50. }
  51. if (typeof cb === 'function')
  52. cb.apply(this, arguments)
  53. })
  54. }
  55. Object.defineProperty(close, previousSymbol, {
  56. value: fs$close
  57. })
  58. return close
  59. })(fs.close)
  60. fs.closeSync = (function (fs$closeSync) {
  61. function closeSync (fd) {
  62. // This function uses the graceful-fs shared queue
  63. fs$closeSync.apply(fs, arguments)
  64. resetQueue()
  65. }
  66. Object.defineProperty(closeSync, previousSymbol, {
  67. value: fs$closeSync
  68. })
  69. return closeSync
  70. })(fs.closeSync)
  71. if (/\bgfs4\b/i.test(process.env.NODE_DEBUG || '')) {
  72. process.on('exit', function() {
  73. debug(fs[gracefulQueue])
  74. require('assert').equal(fs[gracefulQueue].length, 0)
  75. })
  76. }
  77. }
  78. if (!global[gracefulQueue]) {
  79. publishQueue(global, fs[gracefulQueue]);
  80. }
  81. module.exports = patch(clone(fs))
  82. if (process.env.TEST_GRACEFUL_FS_GLOBAL_PATCH && !fs.__patched) {
  83. module.exports = patch(fs)
  84. fs.__patched = true;
  85. }
  86. function patch (fs) {
  87. // Everything that references the open() function needs to be in here
  88. polyfills(fs)
  89. fs.gracefulify = patch
  90. fs.createReadStream = createReadStream
  91. fs.createWriteStream = createWriteStream
  92. var fs$readFile = fs.readFile
  93. fs.readFile = readFile
  94. function readFile (path, options, cb) {
  95. if (typeof options === 'function')
  96. cb = options, options = null
  97. return go$readFile(path, options, cb)
  98. function go$readFile (path, options, cb, startTime) {
  99. return fs$readFile(path, options, function (err) {
  100. if (err && (err.code === 'EMFILE' || err.code === 'ENFILE'))
  101. enqueue([go$readFile, [path, options, cb], err, startTime || Date.now(), Date.now()])
  102. else {
  103. if (typeof cb === 'function')
  104. cb.apply(this, arguments)
  105. }
  106. })
  107. }
  108. }
  109. var fs$writeFile = fs.writeFile
  110. fs.writeFile = writeFile
  111. function writeFile (path, data, options, cb) {
  112. if (typeof options === 'function')
  113. cb = options, options = null
  114. return go$writeFile(path, data, options, cb)
  115. function go$writeFile (path, data, options, cb, startTime) {
  116. return fs$writeFile(path, data, options, function (err) {
  117. if (err && (err.code === 'EMFILE' || err.code === 'ENFILE'))
  118. enqueue([go$writeFile, [path, data, options, cb], err, startTime || Date.now(), Date.now()])
  119. else {
  120. if (typeof cb === 'function')
  121. cb.apply(this, arguments)
  122. }
  123. })
  124. }
  125. }
  126. var fs$appendFile = fs.appendFile
  127. if (fs$appendFile)
  128. fs.appendFile = appendFile
  129. function appendFile (path, data, options, cb) {
  130. if (typeof options === 'function')
  131. cb = options, options = null
  132. return go$appendFile(path, data, options, cb)
  133. function go$appendFile (path, data, options, cb, startTime) {
  134. return fs$appendFile(path, data, options, function (err) {
  135. if (err && (err.code === 'EMFILE' || err.code === 'ENFILE'))
  136. enqueue([go$appendFile, [path, data, options, cb], err, startTime || Date.now(), Date.now()])
  137. else {
  138. if (typeof cb === 'function')
  139. cb.apply(this, arguments)
  140. }
  141. })
  142. }
  143. }
  144. var fs$copyFile = fs.copyFile
  145. if (fs$copyFile)
  146. fs.copyFile = copyFile
  147. function copyFile (src, dest, flags, cb) {
  148. if (typeof flags === 'function') {
  149. cb = flags
  150. flags = 0
  151. }
  152. return go$copyFile(src, dest, flags, cb)
  153. function go$copyFile (src, dest, flags, cb, startTime) {
  154. return fs$copyFile(src, dest, flags, function (err) {
  155. if (err && (err.code === 'EMFILE' || err.code === 'ENFILE'))
  156. enqueue([go$copyFile, [src, dest, flags, cb], err, startTime || Date.now(), Date.now()])
  157. else {
  158. if (typeof cb === 'function')
  159. cb.apply(this, arguments)
  160. }
  161. })
  162. }
  163. }
  164. var fs$readdir = fs.readdir
  165. fs.readdir = readdir
  166. var noReaddirOptionVersions = /^v[0-5]\./
  167. function readdir (path, options, cb) {
  168. if (typeof options === 'function')
  169. cb = options, options = null
  170. var go$readdir = noReaddirOptionVersions.test(process.version)
  171. ? function go$readdir (path, options, cb, startTime) {
  172. return fs$readdir(path, fs$readdirCallback(
  173. path, options, cb, startTime
  174. ))
  175. }
  176. : function go$readdir (path, options, cb, startTime) {
  177. return fs$readdir(path, options, fs$readdirCallback(
  178. path, options, cb, startTime
  179. ))
  180. }
  181. return go$readdir(path, options, cb)
  182. function fs$readdirCallback (path, options, cb, startTime) {
  183. return function (err, files) {
  184. if (err && (err.code === 'EMFILE' || err.code === 'ENFILE'))
  185. enqueue([
  186. go$readdir,
  187. [path, options, cb],
  188. err,
  189. startTime || Date.now(),
  190. Date.now()
  191. ])
  192. else {
  193. if (files && files.sort)
  194. files.sort()
  195. if (typeof cb === 'function')
  196. cb.call(this, err, files)
  197. }
  198. }
  199. }
  200. }
  201. if (process.version.substr(0, 4) === 'v0.8') {
  202. var legStreams = legacy(fs)
  203. ReadStream = legStreams.ReadStream
  204. WriteStream = legStreams.WriteStream
  205. }
  206. var fs$ReadStream = fs.ReadStream
  207. if (fs$ReadStream) {
  208. ReadStream.prototype = Object.create(fs$ReadStream.prototype)
  209. ReadStream.prototype.open = ReadStream$open
  210. }
  211. var fs$WriteStream = fs.WriteStream
  212. if (fs$WriteStream) {
  213. WriteStream.prototype = Object.create(fs$WriteStream.prototype)
  214. WriteStream.prototype.open = WriteStream$open
  215. }
  216. Object.defineProperty(fs, 'ReadStream', {
  217. get: function () {
  218. return ReadStream
  219. },
  220. set: function (val) {
  221. ReadStream = val
  222. },
  223. enumerable: true,
  224. configurable: true
  225. })
  226. Object.defineProperty(fs, 'WriteStream', {
  227. get: function () {
  228. return WriteStream
  229. },
  230. set: function (val) {
  231. WriteStream = val
  232. },
  233. enumerable: true,
  234. configurable: true
  235. })
  236. // legacy names
  237. var FileReadStream = ReadStream
  238. Object.defineProperty(fs, 'FileReadStream', {
  239. get: function () {
  240. return FileReadStream
  241. },
  242. set: function (val) {
  243. FileReadStream = val
  244. },
  245. enumerable: true,
  246. configurable: true
  247. })
  248. var FileWriteStream = WriteStream
  249. Object.defineProperty(fs, 'FileWriteStream', {
  250. get: function () {
  251. return FileWriteStream
  252. },
  253. set: function (val) {
  254. FileWriteStream = val
  255. },
  256. enumerable: true,
  257. configurable: true
  258. })
  259. function ReadStream (path, options) {
  260. if (this instanceof ReadStream)
  261. return fs$ReadStream.apply(this, arguments), this
  262. else
  263. return ReadStream.apply(Object.create(ReadStream.prototype), arguments)
  264. }
  265. function ReadStream$open () {
  266. var that = this
  267. open(that.path, that.flags, that.mode, function (err, fd) {
  268. if (err) {
  269. if (that.autoClose)
  270. that.destroy()
  271. that.emit('error', err)
  272. } else {
  273. that.fd = fd
  274. that.emit('open', fd)
  275. that.read()
  276. }
  277. })
  278. }
  279. function WriteStream (path, options) {
  280. if (this instanceof WriteStream)
  281. return fs$WriteStream.apply(this, arguments), this
  282. else
  283. return WriteStream.apply(Object.create(WriteStream.prototype), arguments)
  284. }
  285. function WriteStream$open () {
  286. var that = this
  287. open(that.path, that.flags, that.mode, function (err, fd) {
  288. if (err) {
  289. that.destroy()
  290. that.emit('error', err)
  291. } else {
  292. that.fd = fd
  293. that.emit('open', fd)
  294. }
  295. })
  296. }
  297. function createReadStream (path, options) {
  298. return new fs.ReadStream(path, options)
  299. }
  300. function createWriteStream (path, options) {
  301. return new fs.WriteStream(path, options)
  302. }
  303. var fs$open = fs.open
  304. fs.open = open
  305. function open (path, flags, mode, cb) {
  306. if (typeof mode === 'function')
  307. cb = mode, mode = null
  308. return go$open(path, flags, mode, cb)
  309. function go$open (path, flags, mode, cb, startTime) {
  310. return fs$open(path, flags, mode, function (err, fd) {
  311. if (err && (err.code === 'EMFILE' || err.code === 'ENFILE'))
  312. enqueue([go$open, [path, flags, mode, cb], err, startTime || Date.now(), Date.now()])
  313. else {
  314. if (typeof cb === 'function')
  315. cb.apply(this, arguments)
  316. }
  317. })
  318. }
  319. }
  320. return fs
  321. }
  322. function enqueue (elem) {
  323. debug('ENQUEUE', elem[0].name, elem[1])
  324. fs[gracefulQueue].push(elem)
  325. retry()
  326. }
  327. // keep track of the timeout between retry() calls
  328. var retryTimer
  329. // reset the startTime and lastTime to now
  330. // this resets the start of the 60 second overall timeout as well as the
  331. // delay between attempts so that we'll retry these jobs sooner
  332. function resetQueue () {
  333. var now = Date.now()
  334. for (var i = 0; i < fs[gracefulQueue].length; ++i) {
  335. // entries that are only a length of 2 are from an older version, don't
  336. // bother modifying those since they'll be retried anyway.
  337. if (fs[gracefulQueue][i].length > 2) {
  338. fs[gracefulQueue][i][3] = now // startTime
  339. fs[gracefulQueue][i][4] = now // lastTime
  340. }
  341. }
  342. // call retry to make sure we're actively processing the queue
  343. retry()
  344. }
  345. function retry () {
  346. // clear the timer and remove it to help prevent unintended concurrency
  347. clearTimeout(retryTimer)
  348. retryTimer = undefined
  349. if (fs[gracefulQueue].length === 0)
  350. return
  351. var elem = fs[gracefulQueue].shift()
  352. var fn = elem[0]
  353. var args = elem[1]
  354. // these items may be unset if they were added by an older graceful-fs
  355. var err = elem[2]
  356. var startTime = elem[3]
  357. var lastTime = elem[4]
  358. // if we don't have a startTime we have no way of knowing if we've waited
  359. // long enough, so go ahead and retry this item now
  360. if (startTime === undefined) {
  361. debug('RETRY', fn.name, args)
  362. fn.apply(null, args)
  363. } else if (Date.now() - startTime >= 60000) {
  364. // it's been more than 60 seconds total, bail now
  365. debug('TIMEOUT', fn.name, args)
  366. var cb = args.pop()
  367. if (typeof cb === 'function')
  368. cb.call(null, err)
  369. } else {
  370. // the amount of time between the last attempt and right now
  371. var sinceAttempt = Date.now() - lastTime
  372. // the amount of time between when we first tried, and when we last tried
  373. // rounded up to at least 1
  374. var sinceStart = Math.max(lastTime - startTime, 1)
  375. // backoff. wait longer than the total time we've been retrying, but only
  376. // up to a maximum of 100ms
  377. var desiredDelay = Math.min(sinceStart * 1.2, 100)
  378. // it's been long enough since the last retry, do it again
  379. if (sinceAttempt >= desiredDelay) {
  380. debug('RETRY', fn.name, args)
  381. fn.apply(null, args.concat([startTime]))
  382. } else {
  383. // if we can't do this job yet, push it to the end of the queue
  384. // and let the next iteration check again
  385. fs[gracefulQueue].push(elem)
  386. }
  387. }
  388. // schedule our next run if one isn't already scheduled
  389. if (retryTimer === undefined) {
  390. retryTimer = setTimeout(retry, 0)
  391. }
  392. }