优雅的控制并发

先面试或者实际开发的需求中,我们常常会碰到需要控制并发量的需求,那么如何才能优雅的去实现这个需求呢,下面简单介绍几种实现方式。

比方说我们有个需求是,需要去请求多个请求,但是并发的请求量不得超过5.

1
2

const requests = [request1, request2, request3, request4,request5, request6, request7, request8];

方式一

简单通过分组来实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 切分数组
function chunk(array, size = 1) {
size = Math.max(Number.parseInt(size), 0)
const length = array == null ? 0 : array.length
if (!length || size < 1) {
return []
}
let index = 0
let resIndex = 0
const result = new Array(Math.ceil(length / size))

while (index < length) {
result[resIndex++] = slice(array, index, (index += size))
}
return result
}

async function concurrentRequest(requests, limit) {
const chunks = chunk(requests, limit);
for (let i = 0; i < chunks.length; i++) {
const chunk = chunks[i];
await Promise.all(chunk.map((request) => request()));
}
}

但是这样有个问题,虽然限制了最大的并发量,但是每次得等每个分片的请求都结束了才能进入到下一个循环,这样就不能最大效率的利用并发量了。

Semaphore

为了最大化的利用并发量,我们可以用一个叫做Semaphore的类,像java,c++,c#经常会使用它来实现并发量的控制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

class Semaphore {
resolves = [];
limit = 0;
cur = 0;
constructor(limit) {
this.limit = limit;
}

acquire() {
return new Promise((resolve) => {
this.cur++;
if (this.cur < this.limit) {
resolve();
} else {
this.resolves.push(resolve);
}
});
}

release() {
this.cur --;
if (this.resolves.length > 0) {
this.resolves.shift()();
}
}
}

async function concurrentRequest(requests, limit) {
const semaphore = new Semaphore(limit);
await Promise.all(requests.map(async (request) => {
await semaphore.acquire();
await request().finally(() => {
semaphore.release();
})
}))
}

这样相当于形成了一个并发池,先进入到池子里的方法会执行,当池子满了后,后面进入的方法,就会等待池子释放信号才能进入,先进入的方法在执行完后,调用 semaphore.release 通知池子有一个空位了,后面的方法通过 await semaphore.acquire 来接收这个信号并继续执行,我们可以看到这个用法的代码也是非常的简单直观,并且保证了并发量一直维持在这没有浪费