hikerBatchFetch.js 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. import DsQueue from './dsQueue.js';
  2. import fastq from "fastq";
  3. import http from 'http';
  4. import https from 'https';
  5. import axios from 'axios';
  6. const batchSockets = 16;
  7. async function sleep(ms) {
  8. // 模拟异步请求
  9. return new Promise((resolve) => {
  10. setTimeout(() => {
  11. resolve();
  12. }, ms);
  13. });
  14. }
  15. export const batchFetch3 = async (items, maxWorkers = 16, timeoutConfig = 5000, batchSize = 16) => {
  16. let t1 = (new Date()).getTime();
  17. const AgentOption = {keepAlive: true, maxSockets: batchSockets, timeout: 30000}; // 最大连接数64,30秒定期清理空闲连接
  18. const httpAgent = new http.Agent(AgentOption);
  19. const httpsAgent = new https.Agent({rejectUnauthorized: false, ...AgentOption});
  20. // 配置 axios 使用代理
  21. const _axios = axios.create({
  22. httpAgent, // 用于 HTTP 请求的代理
  23. httpsAgent, // 用于 HTTPS 请求的代理
  24. });
  25. // 获取全局 timeout 设置
  26. const timeout = timeoutConfig;
  27. // 创建任务处理函数
  28. const worker = async (task, callback) => {
  29. const {item, index, results} = task;
  30. try {
  31. const response = await _axios(
  32. Object.assign({}, item?.options, {
  33. url: item.url,
  34. method: item?.options?.method || 'GET',
  35. timeout: item?.options?.timeout || timeout,
  36. responseType: 'text',
  37. }),
  38. );
  39. results[index] = response.data; // 保存结果
  40. callback(null); // 通知任务成功完成
  41. } catch (error) {
  42. console.log(`[batchFetch][error] ${item.url}: ${error}`);
  43. results[index] = null; // 记录错误
  44. callback(null); // 即使出错,也调用回调,不中断任务队列
  45. }
  46. };
  47. // 创建 fastq 队列
  48. const results = new Array(items.length).fill(null); // 关键改动:提前初始化 results 数组
  49. // 分批次处理
  50. const queue = fastq(worker, maxWorkers); // 关键改动:在整个函数中只创建一个队列
  51. for (let i = 0; i < items.length; i += batchSize) {
  52. const batch = items.slice(i, i + batchSize);
  53. const tasks = batch.map((item, index) => {
  54. return new Promise((resolve) => {
  55. queue.push({item, index: i + index, results}, resolve);
  56. });
  57. });
  58. // 等待当前批次任务完成
  59. await Promise.all(tasks);
  60. // await sleep(200); // 如果需要,可以在这里添加短暂的休眠
  61. }
  62. let t2 = (new Date()).getTime();
  63. console.log(`fastq 批量请求 ${items[0].url} 等 ${items.length}个地址 耗时${t2 - t1}毫秒:`);
  64. return results;
  65. };
  66. export const batchFetch4 = async (items, maxWorkers = 5, timeoutConfig = 5000) => {
  67. let t1 = (new Date()).getTime();
  68. const AgentOption = {keepAlive: true, maxSockets: batchSockets, timeout: 30000}; // 最大连接数64,30秒定期清理空闲连接
  69. const httpAgent = new http.Agent(AgentOption);
  70. const httpsAgent = new https.Agent({rejectUnauthorized: false, ...AgentOption});
  71. // 配置 axios 使用代理
  72. const _axios = axios.create({
  73. httpAgent, // 用于 HTTP 请求的代理
  74. httpsAgent, // 用于 HTTPS 请求的代理
  75. });
  76. // 获取全局 timeout 设置
  77. const timeout = timeoutConfig;
  78. const results = new Array(items.length).fill(null); // 关键改动:提前初始化 results 数组
  79. const queue = new DsQueue(maxWorkers); // 关键改动:在整个函数中只创建一个队列
  80. items.forEach((item, index) => {
  81. queue.add(async () => {
  82. try {
  83. const response = await _axios(
  84. Object.assign({}, item?.options, {
  85. url: item.url,
  86. method: item?.options?.method || 'GET',
  87. timeout: item?.options?.timeout || timeout,
  88. responseType: 'text',
  89. }),
  90. );
  91. results[index] = response.data;
  92. } catch (error) {
  93. console.log(`[batchFetch][error] ${item.url}: ${error}`);
  94. results[index] = null;
  95. }
  96. });
  97. });
  98. await queue.onIdle();
  99. let t2 = (new Date()).getTime();
  100. console.log(`DsQueue 批量请求 ${items[0].url} 等 ${items.length}个地址 耗时${t2 - t1}毫秒:`);
  101. return results;
  102. };