promise实现并发控制和错误重试

本文主要是整理大文件切片上传中关于并发控制和错误重试机制的整理。
主要的场景是这样的:为了减少因为网络波动导致一次上传失败而再次重传的现象,所以将一次文件上传的请求分割成多次切片上传的请求,这里为了快速上传,所以一次可以多发送多次请求,并发上传,但是有一个问题就是并发量过大会对服务器造成负载和阻塞 js 主线程的现象,因此考虑对切片上传要做并发量的控制。还有一个可以优化的点就是,一次切片上传失败并不应该就立即失败,可能是网络波动的问题,因此还要有自动错误重试的功能。

错误重试

一个 promise 在成功之后立即 resolve,在失败之后,等待一段时间后再重试,分析需要几个参数

  • 最大重试次数 maxRetries
  • 延迟时间 delay
  • 包装函数 fn
  • 函数需要的参数 params
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Promise.retry = function (fn, params, delay = 300, maxRetries = 3) {
let retry = 0;
return new Promise((resolve, reject) => {
const executeFn = async () => {
try {
const res = await fn(params);
resolve(res);
} catch (error) {
retry++;
if (retry > maxRetries) reject(error);
else {
// 通过定时调用自身实现定时功能
setTimeout(executeFn, delay);
}
}
};
executeFn();
});
};

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 测试

function fn() {
return new Promise((resolve, reject) => {
reject("11111");
});
}

Promise.retry(fn, 1111)
.then((res) => {
console.log(res);
})
.catch((err) => {
console.log(err);
});

并发控制

要求始终保证在 concurrency 的限制内,需要一个竞速的概念所以要使用 promise.race 控制,最后控制所有的 promise 的结果,需要使用 Promise.all 来限制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
function concurrencyControl(fn, params, concurrency = 3) {
return new Promise(async (resolve, reject) => {
const res = [];
const pool = new Set();
for (const param of params) {
// 构造promise
const p = Promise.resolve(fn(param));
// 结果池
res.push(p);
// 连接池
pool.add(p);
// p 执行完成后删除连接池
p.then(() => {
pool.delete(p);
}).catch((err) => {
reject(err);
});
// 始终保证连接数在concurrency之内
if (pool.size === concurrency) {
await Promise.race(pool);
}
}
// 所有的promise执行完毕
const result = await Promise.all(res);
resolve(result);
});
}

二者结合

并发控制 + 错误重试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
function concurrencyControl(fn, params, concurrency = 3) {
return new Promise(async (resolve, reject) => {
const res = [];
const pool = new Set();
for (const param of params) {
// 构造promise
const p = Promise.retry(fn, param);
// 结果池
res.push(p);
// 连接池
pool.add(p);
// p 执行完成后删除连接池
p.then(() => {
pool.delete(p);
}).catch((err) => {
reject(err);
});
// 始终保证连接数在concurrency之内
if (pool.size === concurrency) {
await Promise.race(pool);
}
}
// 所有的promise执行完毕
const result = await Promise.all(res);
resolve(result);
});
}

Promise.retry = function (fn, params, delay = 300, maxRetries = 3) {
let retry = 0;
return new Promise((resolve, reject) => {
const executeFn = async () => {
try {
const res = await fn(params);
resolve(res);
} catch (error) {
retry++;
if (retry > maxRetries) reject(error);
else {
// 通过定时调用自身实现定时功能
setTimeout(executeFn, delay);
}
}
};
executeFn();
});
};

promise实现并发控制和错误重试
https://sunburst89757.github.io/2023/02/18/concurrencyControl/
作者
Sunburst89757
发布于
2023年2月18日
许可协议