chunk.js 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. import req from './req.js';
  2. import CryptoJS from 'crypto-js';
  3. import {join} from 'path';
  4. import fs from 'fs';
  5. import {PassThrough} from 'stream';
  6. export async function testSupport(url, headers) {
  7. const resp = await req
  8. .get(url, {
  9. responseType: 'stream',
  10. headers: Object.assign(
  11. {
  12. Range: 'bytes=0-0',
  13. },
  14. headers,
  15. ),
  16. })
  17. .catch((err) => {
  18. console.error(err);
  19. return err.response || {status: 500, data: {}};
  20. });
  21. if (resp && resp.status === 206) {
  22. const isAccept = resp.headers['accept-ranges'] === 'bytes';
  23. const contentRange = resp.headers['content-range'];
  24. const contentLength = parseInt(resp.headers['content-length']);
  25. const isSupport = isAccept || !!contentRange || contentLength === 1;
  26. const length = contentRange ? parseInt(contentRange.split('/')[1]) : contentLength;
  27. delete resp.headers['content-range'];
  28. delete resp.headers['content-length'];
  29. if (length) resp.headers['content-length'] = length.toString();
  30. return [isSupport, resp.headers];
  31. } else {
  32. return [false, null];
  33. }
  34. }
  35. const urlHeadCache = {};
  36. let currentUrlKey = '';
  37. const cacheRoot = (process.env['NODE_PATH'] || '.') + '/vod_cache';
  38. const maxCache = 1024 * 1024 * 100;
  39. function delAllCache(keepKey) {
  40. try {
  41. fs.readdir(cacheRoot, (_, files) => {
  42. if (files)
  43. for (const file of files) {
  44. if (file === keepKey) continue;
  45. const dir = join(cacheRoot, file);
  46. fs.stat(dir, (_, stats) => {
  47. if (stats && stats.isDirectory()) {
  48. fs.readdir(dir, (_, subFiles) => {
  49. if (subFiles)
  50. for (const subFile of subFiles) {
  51. if (!subFile.endsWith('.p')) {
  52. fs.rm(join(dir, subFile), {recursive: true}, () => {
  53. });
  54. }
  55. }
  56. });
  57. }
  58. });
  59. }
  60. });
  61. } catch (error) {
  62. console.error(error);
  63. }
  64. }
  65. async function chunkStream(inReq, outResp, url, urlKey, headers, option) {
  66. urlKey = urlKey || CryptoJS.enc.Hex.stringify(CryptoJS.MD5(url)).toString();
  67. if (currentUrlKey !== urlKey) {
  68. delAllCache(urlKey);
  69. currentUrlKey = urlKey;
  70. }
  71. if (!urlHeadCache[urlKey]) {
  72. const [isSupport, urlHeader] = await testSupport(url, headers);
  73. if (!isSupport || !urlHeader['content-length']) {
  74. console.log(`[chunkStream] 获取content-length失败,执行重定向到: ${url}`);
  75. outResp.redirect(url);
  76. return;
  77. }
  78. urlHeadCache[urlKey] = urlHeader;
  79. }
  80. let exist = true;
  81. await fs.promises.access(join(cacheRoot, urlKey)).catch((_) => (exist = false));
  82. if (!exist) {
  83. await fs.promises.mkdir(join(cacheRoot, urlKey), {recursive: true});
  84. }
  85. const contentLength = parseInt(urlHeadCache[urlKey]['content-length']);
  86. let byteStart = 0;
  87. let byteEnd = contentLength - 1;
  88. const streamHeader = {};
  89. if (inReq.headers.range) {
  90. // console.log(inReq.id, inReq.headers.range);
  91. const ranges = inReq.headers.range.trim().split(/=|-/);
  92. if (ranges.length > 2 && ranges[2]) {
  93. byteEnd = parseInt(ranges[2]);
  94. }
  95. byteStart = parseInt(ranges[1]);
  96. Object.assign(streamHeader, urlHeadCache[urlKey]);
  97. streamHeader['content-length'] = (byteEnd - byteStart + 1).toString();
  98. streamHeader['content-range'] = `bytes ${byteStart}-${byteEnd}/${contentLength}`;
  99. outResp.code(206);
  100. } else {
  101. Object.assign(streamHeader, urlHeadCache[urlKey]);
  102. outResp.code(200);
  103. }
  104. option = option || {chunkSize: 1024 * 256, poolSize: 5, timeout: 1000 * 10};
  105. console.log(`[chunkStream] option: `, option);
  106. const chunkSize = option.chunkSize;
  107. const poolSize = option.poolSize;
  108. const timeout = option.timeout;
  109. let chunkCount = Math.ceil(contentLength / chunkSize);
  110. let chunkDownIdx = Math.floor(byteStart / chunkSize);
  111. let chunkReadIdx = chunkDownIdx;
  112. let stop = false;
  113. const dlFiles = {};
  114. for (let i = 0; i < poolSize && i < chunkCount; i++) {
  115. new Promise((resolve) => {
  116. (async function doDLTask(spChunkIdx) {
  117. if (stop || chunkDownIdx >= chunkCount) {
  118. resolve();
  119. return;
  120. }
  121. if (spChunkIdx === undefined && (chunkDownIdx - chunkReadIdx) * chunkSize >= maxCache) {
  122. setTimeout(doDLTask, 5);
  123. return;
  124. }
  125. const chunkIdx = spChunkIdx || chunkDownIdx++;
  126. const taskId = `${inReq.id}-${chunkIdx}`;
  127. try {
  128. const dlFile = join(cacheRoot, urlKey, `${inReq.id}-${chunkIdx}.p`);
  129. let exist = true;
  130. await fs.promises.access(dlFile).catch((_) => (exist = false));
  131. if (!exist) {
  132. const start = chunkIdx * chunkSize;
  133. const end = Math.min(contentLength - 1, (chunkIdx + 1) * chunkSize - 1);
  134. // console.log('[chunkIdx]:', inReq.id, chunkIdx);
  135. const dlResp = await req.get(url, {
  136. responseType: 'stream',
  137. timeout: timeout,
  138. headers: Object.assign(
  139. {
  140. Range: `bytes=${start}-${end}`,
  141. },
  142. headers,
  143. ),
  144. });
  145. const dlCache = join(cacheRoot, urlKey, `${inReq.id}-${chunkIdx}.dl`);
  146. const writer = fs.createWriteStream(dlCache);
  147. const readTimeout = setTimeout(() => {
  148. writer.destroy(new Error(`${taskId} read timeout`));
  149. }, timeout);
  150. const downloaded = new Promise((resolve) => {
  151. writer.on('finish', async () => {
  152. if (stop) {
  153. await fs.promises.rm(dlCache).catch((e) => console.error(e));
  154. } else {
  155. await fs.promises.rename(dlCache, dlFile).catch((e) => console.error(e));
  156. dlFiles[taskId] = dlFile;
  157. }
  158. resolve(true);
  159. });
  160. writer.on('error', async (e) => {
  161. console.error(e);
  162. await fs.promises.rm(dlCache).catch((e1) => console.error(e1));
  163. resolve(false);
  164. });
  165. });
  166. dlResp.data.pipe(writer);
  167. const result = await downloaded;
  168. clearTimeout(readTimeout);
  169. if (!result) {
  170. setTimeout(() => {
  171. doDLTask(chunkIdx);
  172. }, 15);
  173. return;
  174. }
  175. }
  176. setTimeout(doDLTask, 5);
  177. } catch (error) {
  178. console.error(error);
  179. setTimeout(() => {
  180. doDLTask(chunkIdx);
  181. }, 15);
  182. }
  183. })();
  184. });
  185. }
  186. outResp.headers(streamHeader);
  187. const stream = new PassThrough();
  188. new Promise((resolve) => {
  189. let writeMore = true;
  190. (async function waitReadFile() {
  191. try {
  192. if (chunkReadIdx >= chunkCount || stop) {
  193. stream.end();
  194. resolve();
  195. return;
  196. }
  197. if (!writeMore) {
  198. setTimeout(waitReadFile, 5);
  199. return;
  200. }
  201. const taskId = `${inReq.id}-${chunkReadIdx}`;
  202. if (!dlFiles[taskId]) {
  203. setTimeout(waitReadFile, 5);
  204. return;
  205. }
  206. const chunkByteStart = chunkReadIdx * chunkSize;
  207. const chunkByteEnd = Math.min(contentLength - 1, (chunkReadIdx + 1) * chunkSize - 1);
  208. const readFileStart = Math.max(byteStart, chunkByteStart) - chunkByteStart;
  209. const dlFile = dlFiles[taskId];
  210. delete dlFiles[taskId];
  211. const fd = await fs.promises.open(dlFile, 'r');
  212. const buffer = Buffer.alloc(chunkByteEnd - chunkByteStart - readFileStart + 1);
  213. await fd.read(buffer, 0, chunkByteEnd - chunkByteStart - readFileStart + 1, readFileStart);
  214. await fd.close().catch((e) => console.error(e));
  215. await fs.promises.rm(dlFile).catch((e) => console.error(e));
  216. writeMore = stream.write(buffer);
  217. if (!writeMore) {
  218. stream.once('drain', () => {
  219. writeMore = true;
  220. });
  221. }
  222. chunkReadIdx++;
  223. setTimeout(waitReadFile, 5);
  224. } catch (error) {
  225. setTimeout(waitReadFile, 5);
  226. }
  227. })();
  228. });
  229. stream.on('close', async () => {
  230. Object.keys(dlFiles).forEach((reqKey) => {
  231. if (reqKey.startsWith(inReq.id)) {
  232. fs.rm(dlFiles[reqKey], {recursive: true}, () => {
  233. });
  234. delete dlFiles[reqKey];
  235. }
  236. });
  237. stop = true;
  238. });
  239. return stream;
  240. }
  241. export default chunkStream;