0%

Asyncpool

并发控制

控制并发数量,正在执行与待执行的有效分离队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
poolLimit 限制并发数
array 任务
iteratorFn 迭代回调 用于实现对每个任务项进行处理,该函数会返回一个 Promise 对象或异步函数。

// 循环任务 创建异步任务,返回Promise 进行保存新的异步任务
// 并发数小于实际任务数,进行并发控制
// 执行后,移除已完成的任务
// 保存执行中的任务
// 执行中的大于限制任务 就进行Race
async function asyncPool(poolLimit, array, iteratorFn) {
const ret = [];
const executing = [];
for (const item of array) {
const p = Promise.resolve().then(() => iteratorFn(item, array));
ret.push(p);

if (poolLimit <= array.length) {
const e = p.then(() => executing.splice(executing.indexOf(e), 1));
executing.push(e);
if (executing.length >= poolLimit) {
await Promise.race(executing);
}
}
}
return Promise.all(ret);
}

let timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
await asyncPool(2, [1000, 3000, 2000], timeout);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
function fetch(url) {
// 模拟接口请求
return new Promise((resolve) => {
setTimeout(() => {
resolve(url);
}, 1000);
});
}

/**
* 接口请求最大并发量控制
* @param { Array } urls 接口请求地址数组集合
* @param { Number } max 最大并发量
* @param { Function } callback 回调函数
*/
function maxRequestLimit(arr, max, callback) {
// 如果没有传入urls或max则不继续执行
if (!arr || !max) return;

// 当请求地址数组集合长度为0,则执行回调函数(如果有的话),并结束后续执行
if (arr.length === 0) {
if (callback) callback();
return;
}

let fetchArr = [], // 存储并发max的promise数组
i = 0;

// 1.fetch
// 2.执行并且放入存储队列
// 3.判别并行数量与任务最大限制 如果超过通过race执行第一个完成的
// 4.调用下函数自身
function toFetch() {
// 所有的请求都受理,则返回一个resolve
if (i === arr.length) return Promise.resolve();

// 取出第i个url, 放入fetch里面 , 每取一次i++
let one = fetch(arr[i++]);

//将当前的promise存入并发数组中
fetchArr.push(one);

// 当promise执行完毕后,从数组删除
one.then((res) => {
console.log(res);
fetchArr.splice(fetchArr.indexOf(one), 1);
});

let p = Promise.resolve();

// 当并行数量达到最大后, 用race比较 第一个完成的, 然后再调用一下函数自身。
if (fetchArr.length >= max) p = Promise.race(fetchArr);

return p.then(() => toFetch());
}

// fetchArr循环完后,现在fetchArr里面剩下的promise对象, 使用all等待所有的都完成之后执行callback
toFetch()
.then(() => Promise.all(fetchArr))
.then(() => callback());
}
maxRequestLimit([1, 2, 3, 4, 5, 7, 8, 9], 3, () => {
console.log("fetch end");
});