index.js 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. /*
  2. M3U8 Custom Stream Downloader
  3. Modifed from node-m3u8 module by fenton
  4. History:
  5. 2018-02-15 Added callbacks and integration of LiveMe Video Structure
  6. */
  7. const PassThrough = require('stream').PassThrough;
  8. const urlResolve = require('url').resolve;
  9. const miniget = require('./miniget');
  10. const m3u8 = require('./m3u8-parser');
  11. const Queue = require('./queue');
  12. module.exports = (video, options) => {
  13. var stream = new PassThrough();
  14. options = options || {
  15. on_complete: function(e) {},
  16. on_error: function(e) {},
  17. on_progress: function(e) { return { current: 0, total: 0 }}
  18. };
  19. var timeoutTimer = null;
  20. var chunkReadahead = options.chunkReadahead || 3;
  21. var refreshInterval = options.refreshInterval || 600000; // 10 minutes
  22. var requestOptions = options.requestOptions;
  23. var totalItems = 0;
  24. var chunkIndex = 0;
  25. var latestSegment;
  26. var streamQueue = new Queue((segment, callback) => {
  27. latestSegment = segment;
  28. segment.pipe(stream, { end: false });
  29. segment.on('end', callback);
  30. }, { concurrency: 1 });
  31. var requestQueue = new Queue((segmentURL, callback) => {
  32. var segment = miniget(urlResolve(video.hlsvideosource, segmentURL), requestOptions);
  33. segment.on('error', callback);
  34. streamQueue.push(segment, callback);
  35. clearTimeout(timeoutTimer);
  36. timeoutTimer = setTimeout(() => {
  37. onError('Download timeout');
  38. }, 7500);
  39. chunkIndex++;
  40. if (chunkIndex > chunkReadahead)
  41. options.on_progress({
  42. index: chunkIndex,
  43. total: totalItems,
  44. videoid: video.vid
  45. });
  46. }, {
  47. concurrency: chunkReadahead,
  48. unique: (segmentURL) => segmentURL,
  49. });
  50. function onError(err) {
  51. //stream.emit('error', err);
  52. options.on_error({ videoid: video.vid, error: err });
  53. stream.end();
  54. }
  55. // When to look for items again.
  56. var refreshThreshold;
  57. var fetchingPlaylist = false;
  58. var destroyed = false;
  59. var ended = false;
  60. function onQueuedEnd(err) {
  61. if (err) {
  62. onError(err);
  63. } else if (!fetchingPlaylist && !destroyed && !ended &&
  64. requestQueue.tasks.length + requestQueue.active === refreshThreshold) {
  65. refreshPlaylist();
  66. } else if (ended && !requestQueue.tasks.length && !requestQueue.active) {
  67. stream.end();
  68. options.on_complete({ videoid: video.vid, filename: video._filename });
  69. }
  70. }
  71. var tid;
  72. function refreshPlaylist() {
  73. clearTimeout(tid);
  74. fetchingPlaylist = true;
  75. var req = miniget(video.hlsvideosource, requestOptions);
  76. req.on('error', onError);
  77. var parser = req.pipe(new m3u8());
  78. parser.on('tag', (tagName) => {
  79. if (tagName === 'EXT-X-ENDLIST') {
  80. ended = true;
  81. req.unpipe();
  82. clearTimeout(tid);
  83. }
  84. });
  85. parser.on('item', (item) => {
  86. totalItems++;
  87. requestQueue.push(item, onQueuedEnd);
  88. });
  89. parser.on('end', () => {
  90. refreshThreshold = Math.ceil(totalItems * 0.01);
  91. tid = setTimeout(refreshPlaylist, refreshInterval);
  92. fetchingPlaylist = false;
  93. });
  94. }
  95. refreshPlaylist();
  96. stream.end = () => {
  97. destroyed = true;
  98. streamQueue.die();
  99. requestQueue.die();
  100. clearTimeout(tid);
  101. if (latestSegment) { latestSegment.unpipe(); }
  102. PassThrough.prototype.end.call(stream);
  103. };
  104. return stream;
  105. };