实现带并发限制的 Promise

问题描述

请实现一个 Scheduler 类,它允许你添加异步任务,并控制这些任务的并发执行数量。

这个类需要满足以下要求:

  1. Scheduler 的构造函数接收一个正整数 limit,表示最大并发执行的任务数。
  2. Scheduler 类拥有一个 add 方法,该方法接收一个函数 promiseCreatorpromiseCreator 函数在被调用时会返回一个 Promise。
  3. add 方法本身也应该返回一个 Promise。这个 Promise 应该在 promiseCreator 被执行并 resolveresolve,其值是 promiseCreator 返回的 Promise 的 resolve 值。
  4. 同一时间,Scheduler 最多只能有 limit 个任务正在执行。当正在执行的任务数达到 limit 时,后续添加的任务需要进入队列等待。当一个任务执行完成后,应立即从队列中取出一个新任务来执行。

示例

示例 1:

假设我们有以下代码:

const timeout = (time) =>
  new Promise((resolve) => {
    setTimeout(resolve, time);
  });

const scheduler = new Scheduler(2);

const addTask = (time, order) => {
  scheduler.add(() => timeout(time)).then(() => console.log(order));
};

addTask(1000, '1');
addTask(500, '2');
addTask(300, '3');
addTask(400, '4');

// scheduler 会在 2s 后输出: 2, 3, 1, 4

说明

  • 初始状态scheduler 的并发限制是 2。任务 1 (1000ms) 和任务 2 (500ms) 被添加,并立即开始执行。此时正在运行的任务数是 2。
  • 500ms 后: 任务 2 完成,输出 2。scheduler 从等待队列中取出任务 3 (300ms) 并开始执行。此时正在运行的任务是任务 1 和任务 3。
  • 800ms 后 (500ms + 300ms): 任务 3 完成,输出 3。scheduler 从等待队列中取出任务 4 (400ms) 并开始执行。此时正在运行的任务是任务 1 和任务 4。
  • 1000ms 后: 任务 1 完成,输出 1。此时队列为空,没有新任务可执行。正在运行的只剩下任务 4。
  • 1200ms 后 (800ms + 400ms): 任务 4 完成,输出 4。所有任务执行完毕。
  • 最终输出顺序: 2, 3, 1, 4

解题思路

核心逻辑

实现并发控制器的关键在于管理任务队列当前执行任务数计数器,通过状态监控和调度循环确保并发数不超过限制。

状态管理

  1. 任务队列(queue)
    • 用数组存储等待执行的任务,每个任务包装为函数以便调用。
  2. 执行计数器(runningCount)
    • 记录当前正在执行的任务数,初始为 0。
  3. 并发限制(limit)
    • 构造函数中指定最大并发数,如 limit = 2

add 方法逻辑

  1. Promise 返回机制
    • add 方法返回 Promise,允许外部通过 .then()await 获取任务结果。
  2. 任务入队处理
    • 新建 Promise 并将 resolve 与任务函数 promiseCreator 存入队列,不直接执行任务。
  3. 触发调度
    • 调用内部 _runQueue 方法检查是否可执行新任务。

调度执行(_runQueue)逻辑

  1. 执行条件
    • 同时满足 runningCount < limit(有空闲槽位)和 queue.length > 0(有等待任务)。
  2. 任务执行流程
    • runningCount++(占用槽位)→ 从队列头部取出任务 → 调用 promiseCreator 获得业务 Promise。
  3. 任务完成处理
    • 监听业务 Promise 完成,无论成败均执行 runningCount--(释放槽位),并递归调用 _runQueue 触发下一次调度,形成循环。

代码实现

class Scheduler {
  /**
   * @param {number} limit - 最大并发任务数
   */
  constructor(limit) {
    this.limit = limit; // 并发限制
    this.runningCount = 0; // 当前正在运行的任务数
    this.queue = []; // 等待执行的任务队列
  }

  /**
   * 添加一个新的异步任务
   * @param {() => Promise<any>} promiseCreator - 一个返回 Promise 的函数
   * @returns {Promise<any>} - 该 Promise 会在任务执行后 resolve
   */
  add(promiseCreator) {
    return new Promise((resolve) => {
      // 将任务的执行函数和 promise 的 resolve 方法打包推入队列
      this.queue.push({
        run: promiseCreator,
        resolve,
      });
      // 尝试执行队列中的任务
      this._runQueue();
    });
  }

  /**
   * @private
   * 内部方法,用于执行队列中的任务
   */
  _runQueue() {
    // 如果当前运行的任务数小于限制,并且队列中有等待的任务
    while (this.runningCount < this.limit && this.queue.length > 0) {
      // 取出队列中的第一个任务
      const task = this.queue.shift();
      if (task) {
        this.runningCount++;

        // 执行任务(即调用 promiseCreator)
        task
          .run()
          .then((result) => {
            // 当任务完成后,通过其对应的 resolve 方法,
            // 来 resolve add 方法返回的 promise
            task.resolve(result);
          })
          .catch((error) => {
            // 在实际应用中,也应该处理拒绝的情况
            // 这里为了简化,我们假设任务总是成功
            // 如果需要,可以添加一个 reject 回调
            task.resolve(); // 或者 task.reject(error)
          })
          .finally(() => {
            // 任务完成(无论成功或失败),将运行中的任务数减一
            this.runningCount--;
            // 检查并执行下一个任务
            this._runQueue();
          });
      }
    }
  }
}

// --- 测试用例 ---

const timeout = (time) =>
  new Promise((resolve) => {
    setTimeout(() => resolve(time), time);
  });

const scheduler = new Scheduler(2);

const addTask = (time, order) => {
  scheduler.add(() => timeout(time)).then(() => console.log(order));
};

console.log('开始添加任务...');

addTask(1000, '1');
addTask(500, '2');
addTask(300, '3');
addTask(400, '4');

// 预期输出:
// (大约 500ms 后) 2
// (大约 800ms 后) 3
// (大约 1000ms 后) 1
// (大约 1200ms 后) 4
Copyright © Jun 2025 all right reserved,powered by Gitbook该文件修订时间: 2025-07-03 17:35:08

results matching ""

    No results matching ""

    results matching ""

      No results matching ""