drpyBatchFetch.js 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. import PQueue from 'p-queue';
  2. import Queue from 'queue';
  3. import http from 'http';
  4. import https from 'https';
  5. import axios from 'axios';
  6. const batchSockets = 16;
  7. export const batchFetch1 = async (items, maxWorkers = 5, timeoutConfig = 5000) => {
  8. let t1 = (new Date()).getTime();
  9. const AgentOption = {keepAlive: true, maxSockets: batchSockets, timeout: 30000}; // 最大连接数64,30秒定期清理空闲连接
  10. // const AgentOption = {keepAlive: true};
  11. const httpAgent = new http.Agent(AgentOption);
  12. const httpsAgent = new https.Agent({rejectUnauthorized: false, ...AgentOption});
  13. // 配置 axios 使用代理
  14. const _axios = axios.create({
  15. httpAgent, // 用于 HTTP 请求的代理
  16. httpsAgent, // 用于 HTTPS 请求的代理
  17. });
  18. const queue = new PQueue({concurrency: maxWorkers});
  19. // 获取全局 timeout 设置
  20. const timeout = timeoutConfig;
  21. // 遍历 items 并生成任务队列
  22. const promises = items.map((item) => {
  23. return queue.add(async () => {
  24. try {
  25. const response = await _axios(
  26. Object.assign({}, item?.options, {
  27. url: item.url,
  28. method: item?.options?.method || 'GET',
  29. timeout: item?.options?.timeout || timeout,
  30. responseType: 'text',
  31. }),
  32. );
  33. return response.data;
  34. } catch (error) {
  35. console.log(`[batchFetch][error] ${item.url}: ${error}`);
  36. return null;
  37. }
  38. });
  39. });
  40. const results = await Promise.all(promises);
  41. let t2 = (new Date()).getTime();
  42. log(`PQueue 批量请求 ${items[0].url} 等 ${items.length}个地址 耗时${t2 - t1}毫秒:`);
  43. // 执行所有任务
  44. return results
  45. };
  46. export const batchFetch2 = async (items, maxWorkers = 5, timeoutConfig = 5000) => {
  47. let t1 = (new Date()).getTime();
  48. const AgentOption = {keepAlive: true, maxSockets: batchSockets, timeout: 30000}; // 最大连接数64,30秒定期清理空闲连接
  49. // const AgentOption = {keepAlive: true};
  50. const httpAgent = new http.Agent(AgentOption);
  51. const httpsAgent = new https.Agent({rejectUnauthorized: false, ...AgentOption});
  52. // 配置 axios 使用代理
  53. const _axios = axios.create({
  54. httpAgent, // 用于 HTTP 请求的代理
  55. httpsAgent, // 用于 HTTPS 请求的代理
  56. });
  57. const queue = new Queue({concurrency: maxWorkers, autostart: true});
  58. // 获取全局 timeout 设置
  59. const timeout = timeoutConfig;
  60. const results = [];
  61. const promises = [];
  62. items.forEach((item, index) => {
  63. promises.push(
  64. new Promise((resolve) => {
  65. queue.push(async () => {
  66. try {
  67. const response = await _axios(
  68. Object.assign({}, item?.options, {
  69. url: item.url,
  70. method: item?.options?.method || 'GET',
  71. timeout: item?.options?.timeout || timeout,
  72. responseType: 'text',
  73. }),
  74. );
  75. results[index] = response.data;
  76. resolve();
  77. } catch (error) {
  78. console.log(`[batchFetch][error] ${item.url}: ${error}`);
  79. results[index] = null;
  80. resolve();
  81. }
  82. });
  83. }),
  84. );
  85. });
  86. await Promise.all(promises);
  87. let t2 = (new Date()).getTime();
  88. log(`Queue 批量请求 ${items[0].url} 等 ${items.length}个地址 耗时${t2 - t1}毫秒:`);
  89. return results;
  90. };