batchExecute.js 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. import fastq from 'fastq';
  2. /**
  3. * Batch execution function with concurrency control and early termination.
  4. * @param {Array} tasks - Array of task objects, each containing func, param, and id.
  5. * @param {Object} listener - Progress listener object containing func and param.
  6. * @param {number} [successCount] - Number of successful tasks to wait for before stopping.
  7. * @param {number} max_task - Maximum number of concurrent tasks.
  8. * @returns {Promise<Array>} - Resolves with the successful results when the required tasks are complete.
  9. */
  10. async function batchExecute(tasks, listener, successCount, max_task = 0) {
  11. const maxConcurrency = Number(max_task) || Number(process.env.MAX_TASK) || 2; // Default concurrency
  12. // console.log(`batchExecute with max_task: ${maxConcurrency}`);
  13. let completedSuccess = 0;
  14. let stopExecution = false;
  15. const successfulResults = []; // To store successful results
  16. const queue = fastq.promise(async (task) => {
  17. if (stopExecution) return; // Skip processing if execution has stopped
  18. const {func, param, id} = task;
  19. try {
  20. // Check for stop condition at the start of each task
  21. if (stopExecution) return;
  22. const result = await func({...param, stopExecution: () => stopExecution});
  23. if (stopExecution) return; // Check again after task execution
  24. // if (result && result.url) { // Success condition
  25. successfulResults.push(result);
  26. completedSuccess++;
  27. // }
  28. if (listener && typeof listener.func === 'function') {
  29. const listenerResult = listener.func(listener.param, id, null, result);
  30. if (listenerResult === 'break') {
  31. stopExecution = true;
  32. }
  33. }
  34. if (successCount && completedSuccess >= successCount) {
  35. stopExecution = true;
  36. }
  37. } catch (error) {
  38. if (listener && typeof listener.func === 'function') {
  39. listener.func(listener.param, id, error, null);
  40. }
  41. }
  42. }, maxConcurrency);
  43. // Enqueue tasks with a stop check
  44. tasks.forEach((task) => {
  45. queue.push(task).catch((err) => {
  46. console.error(`Task queue error for task ${task.id}:`, err);
  47. });
  48. });
  49. // Monitor the queue and clear it on stopExecution
  50. const stopMonitor = new Promise((resolve) => {
  51. const interval = setInterval(() => {
  52. if (stopExecution) {
  53. queue.kill(); // Clear all pending tasks
  54. clearInterval(interval);
  55. resolve();
  56. }
  57. }, 50); // Check every 50ms
  58. });
  59. // Wait for either stopExecution or all tasks to finish
  60. await Promise.race([queue.drained(), stopMonitor]);
  61. console.log(`batchExecute completed with max_task: ${maxConcurrency} and ${completedSuccess} successful tasks.`);
  62. return successfulResults;
  63. }
  64. export default batchExecute;