实现带并发限制的 Promise
问题描述
请实现一个 Scheduler
类,它允许你添加异步任务,并控制这些任务的并发执行数量。
这个类需要满足以下要求:
Scheduler
的构造函数接收一个正整数limit
,表示最大并发执行的任务数。Scheduler
类拥有一个add
方法,该方法接收一个函数promiseCreator
。promiseCreator
函数在被调用时会返回一个 Promise。add
方法本身也应该返回一个 Promise。这个 Promise 应该在promiseCreator
被执行并resolve
后resolve
,其值是promiseCreator
返回的 Promise 的resolve
值。- 同一时间,
Scheduler
最多只能有limit
个任务正在执行。当正在执行的任务数达到limit
时,后续添加的任务需要进入队列等待。当一个任务执行完成后,应立即从队列中取出一个新任务来执行。
示例
示例 1:
假设我们有以下代码:
const timeout = (time) =>
new Promise((resolve) => {
setTimeout(resolve, time);
});
const scheduler = new Scheduler(2);
const addTask = (time, order) => {
scheduler.add(() => timeout(time)).then(() => console.log(order));
};
addTask(1000, '1');
addTask(500, '2');
addTask(300, '3');
addTask(400, '4');
// scheduler 会在 2s 后输出: 2, 3, 1, 4
说明
- 初始状态:
scheduler
的并发限制是 2。任务 1 (1000ms) 和任务 2 (500ms) 被添加,并立即开始执行。此时正在运行的任务数是 2。 - 500ms 后: 任务 2 完成,输出 2。
scheduler
从等待队列中取出任务 3 (300ms) 并开始执行。此时正在运行的任务是任务 1 和任务 3。 - 800ms 后 (500ms + 300ms): 任务 3 完成,输出 3。
scheduler
从等待队列中取出任务 4 (400ms) 并开始执行。此时正在运行的任务是任务 1 和任务 4。 - 1000ms 后: 任务 1 完成,输出 1。此时队列为空,没有新任务可执行。正在运行的只剩下任务 4。
- 1200ms 后 (800ms + 400ms): 任务 4 完成,输出 4。所有任务执行完毕。
- 最终输出顺序: 2, 3, 1, 4
解题思路
核心逻辑
实现并发控制器的关键在于管理任务队列和当前执行任务数计数器,通过状态监控和调度循环确保并发数不超过限制。
状态管理
- 任务队列(queue):
- 用数组存储等待执行的任务,每个任务包装为函数以便调用。
- 执行计数器(runningCount):
- 记录当前正在执行的任务数,初始为 0。
- 并发限制(limit):
- 构造函数中指定最大并发数,如
limit = 2
。
- 构造函数中指定最大并发数,如
add 方法逻辑
- Promise 返回机制:
add
方法返回 Promise,允许外部通过.then()
或await
获取任务结果。
- 任务入队处理:
- 新建 Promise 并将
resolve
与任务函数promiseCreator
存入队列,不直接执行任务。
- 新建 Promise 并将
- 触发调度:
- 调用内部
_runQueue
方法检查是否可执行新任务。
- 调用内部
调度执行(_runQueue)逻辑
- 执行条件:
- 同时满足
runningCount < limit
(有空闲槽位)和queue.length > 0
(有等待任务)。
- 同时满足
- 任务执行流程:
runningCount++
(占用槽位)→ 从队列头部取出任务 → 调用promiseCreator
获得业务 Promise。
- 任务完成处理:
- 监听业务 Promise 完成,无论成败均执行
runningCount--
(释放槽位),并递归调用_runQueue
触发下一次调度,形成循环。
- 监听业务 Promise 完成,无论成败均执行
代码实现
class Scheduler {
/**
* @param {number} limit - 最大并发任务数
*/
constructor(limit) {
this.limit = limit; // 并发限制
this.runningCount = 0; // 当前正在运行的任务数
this.queue = []; // 等待执行的任务队列
}
/**
* 添加一个新的异步任务
* @param {() => Promise<any>} promiseCreator - 一个返回 Promise 的函数
* @returns {Promise<any>} - 该 Promise 会在任务执行后 resolve
*/
add(promiseCreator) {
return new Promise((resolve) => {
// 将任务的执行函数和 promise 的 resolve 方法打包推入队列
this.queue.push({
run: promiseCreator,
resolve,
});
// 尝试执行队列中的任务
this._runQueue();
});
}
/**
* @private
* 内部方法,用于执行队列中的任务
*/
_runQueue() {
// 如果当前运行的任务数小于限制,并且队列中有等待的任务
while (this.runningCount < this.limit && this.queue.length > 0) {
// 取出队列中的第一个任务
const task = this.queue.shift();
if (task) {
this.runningCount++;
// 执行任务(即调用 promiseCreator)
task
.run()
.then((result) => {
// 当任务完成后,通过其对应的 resolve 方法,
// 来 resolve add 方法返回的 promise
task.resolve(result);
})
.catch((error) => {
// 在实际应用中,也应该处理拒绝的情况
// 这里为了简化,我们假设任务总是成功
// 如果需要,可以添加一个 reject 回调
task.resolve(); // 或者 task.reject(error)
})
.finally(() => {
// 任务完成(无论成功或失败),将运行中的任务数减一
this.runningCount--;
// 检查并执行下一个任务
this._runQueue();
});
}
}
}
}
// --- 测试用例 ---
const timeout = (time) =>
new Promise((resolve) => {
setTimeout(() => resolve(time), time);
});
const scheduler = new Scheduler(2);
const addTask = (time, order) => {
scheduler.add(() => timeout(time)).then(() => console.log(order));
};
console.log('开始添加任务...');
addTask(1000, '1');
addTask(500, '2');
addTask(300, '3');
addTask(400, '4');
// 预期输出:
// (大约 500ms 后) 2
// (大约 800ms 后) 3
// (大约 1000ms 后) 1
// (大约 1200ms 后) 4