特工竞技场
54.28M · 2026-03-23
最近做 CR(Code Review) 时发现:某个接口对同一下游服务发起了两次 Dubbo RPC 调用,但代码是串行等待。如果这些 RPC 之间没有真实依赖,端到端耗时会被“等待链”相加。本文从 Java 工程视角,讨论如何把串行编排升级为可控的并行扇出/汇总,并把超时、线程池隔离与降级兜底一起设计好。
先看一个最常见的串行伪代码:
Result a = rpcA(); // 100ms
Result b = rpcB(); // 120ms
Result c = rpcC(); // 80ms
return combine(a, b, c);
假设每个 RPC 平均 100ms 左右,三个串起来就是 300ms 左右。只要你是串行一步步等结果,整体耗时就是所有调用时间之和。
如果是 10 个 RPC 呢?
哪怕每个只要 80ms,串行下来也接近 800ms,用户基本能感觉到“卡了一下”。
如果你在一个高并发场景(比如大促、秒杀、首页加载)这么干,就很容易把自己卡死在下游调用上。
但有个关键点:很多 RPC 之间其实是互不依赖的。
既然不依赖,为什么一定要一个一个排队调用呢?
最容易想到的优化是:把互不依赖的 RPC 并行化。
假设有 10 个 RPC 调用,其中前 8 个互不依赖,最后 2 个依赖前面 8 个的结果。这其实是一个很典型的“扇出 + 汇总”的调用结构:
如果还是串行写法:
如果你把前 8 个变成并行调用:
同样 10 次调用,耗时直接砍掉三分之二。
很多人在第一次做这类优化时会发现:卡的往往不是下游速度,而是业务代码把“互不依赖”的调用强行串起来了(即同步等待思维)。
知道“要并行”很容易,难的是“线程池/超时/异常语义如何统一”,否则很容易把吞吐换成排队。
常见的几个实现思路:
CompletableFuture、Go goroutine + channel、Node.js Promise/async/await 等)以 Java 为例,用 CompletableFuture 可以很自然地把“互不依赖的一批任务”并行化:
CompletableFuture<ResultA> fa = supplyAsync(this::rpcA, executor);
CompletableFuture<ResultB> fb = supplyAsync(this::rpcB, executor);
CompletableFuture<ResultC> fc = supplyAsync(this::rpcC, executor);
// 等待全部完成
CompletableFuture.allOf(fa, fb, fc).join();
ResultA a = fa.get();
ResultB b = fb.get();
ResultC c = fc.get();
return combine(a, b, c);
调用方写起来还是“同步思路”,但底层实际是并行执行的。
这就是所谓的:用同步风格写异步逻辑。
工程上建议同时注意三点:
supplyAsync/异步执行必须显式传入业务隔离的 executor,避免落入公共线程池导致全局互相影响。allOf(...).join() 只负责“等待全部完成”;真正取结果时按 get()/join() 的异常语义处理(join() 会把异常包装成 CompletionException)。接下来把重点放到“依赖型后续”怎么落地:先把互不依赖的那 8 次 RPC 并行扇出拿到结果,再把这批结果做一次聚合/摘要,最后基于摘要触发依赖那 2 次调用。
工程上再把编排和执行分开即可:应用层负责“依赖编排”,基础设施层负责“并发执行 + 容错兜底”。
// 8 个互不依赖 + 2 个有依赖(依赖 summary 的后续示例)
// 演示用:给每个任务加超时和失败兜底(返回 InfoN.empty() / DepResultN.empty() 之类的默认值)
CompletableFuture<Info1> f1 = CompletableFuture
.supplyAsync(this::rpc1, executor)
.orTimeout(200, java.util.concurrent.TimeUnit.MILLISECONDS)
.exceptionally(ex -> Info1.empty());
CompletableFuture<Info2> f2 = CompletableFuture
.supplyAsync(this::rpc2, executor)
.orTimeout(200, java.util.concurrent.TimeUnit.MILLISECONDS)
.exceptionally(ex -> Info2.empty());
CompletableFuture<Info3> f3 = CompletableFuture
.supplyAsync(this::rpc3, executor)
.orTimeout(200, java.util.concurrent.TimeUnit.MILLISECONDS)
.exceptionally(ex -> Info3.empty());
CompletableFuture<Info4> f4 = CompletableFuture
.supplyAsync(this::rpc4, executor)
.orTimeout(200, java.util.concurrent.TimeUnit.MILLISECONDS)
.exceptionally(ex -> Info4.empty());
CompletableFuture<Info5> f5 = CompletableFuture
.supplyAsync(this::rpc5, executor)
.orTimeout(200, java.util.concurrent.TimeUnit.MILLISECONDS)
.exceptionally(ex -> Info5.empty());
CompletableFuture<Info6> f6 = CompletableFuture
.supplyAsync(this::rpc6, executor)
.orTimeout(200, java.util.concurrent.TimeUnit.MILLISECONDS)
.exceptionally(ex -> Info6.empty());
CompletableFuture<Info7> f7 = CompletableFuture
.supplyAsync(this::rpc7, executor)
.orTimeout(200, java.util.concurrent.TimeUnit.MILLISECONDS)
.exceptionally(ex -> Info7.empty());
CompletableFuture<Info8> f8 = CompletableFuture
.supplyAsync(this::rpc8, executor)
.orTimeout(200, java.util.concurrent.TimeUnit.MILLISECONDS)
.exceptionally(ex -> Info8.empty());
// 先汇总前 8 个结果(这一步会等待 f1~f8 全部完成)
CompletableFuture<Summary> summaryFuture = CompletableFuture
.allOf(f1, f2, f3, f4, f5, f6, f7, f8)
.thenApply(v -> buildSummary(
f1.join(), f2.join(), f3.join(), f4.join(),
f5.join(), f6.join(), f7.join(), f8.join()
));
// 再用 summary 触发后面 2 个“依赖型”调用:thenCompose 体现依赖关系
CompletableFuture<DepResult1> d1Future = summaryFuture.thenCompose(s ->
CompletableFuture.supplyAsync(() -> rpcDep1(s), executor)
.orTimeout(200, java.util.concurrent.TimeUnit.MILLISECONDS)
.exceptionally(ex -> DepResult1.empty())
);
CompletableFuture<DepResult2> d2Future = summaryFuture.thenCompose(s ->
CompletableFuture.supplyAsync(() -> rpcDep2(s), executor)
.orTimeout(200, java.util.concurrent.TimeUnit.MILLISECONDS)
.exceptionally(ex -> DepResult2.empty())
);
// 最后把依赖结果合并成统一返回
return d1Future.thenCombine(d2Future, this::combineFinal).join();
补充说明:上面示例里的 orTimeout 更准确地说是“让等待 future 的调用方快速返回一个超时完成结果”。它不等价于一定会在底层把正在执行的 RPC 立刻停掉;如果你的 RPC 框架支持取消/中断,建议把超时联动到 RPC 层的取消与资源释放,避免后台任务继续占用连接、线程或下游配额。
另外,exceptionally(...) 会把异常转换成默认值并“吞掉”部分堆栈信息;如果你把它当作降级兜底,务必在兜底分支里同步记录日志/指标/链路,方便定位尾延迟来源。
很多人一听并行,就想“能并行的全并行”。但实战中有三个常见坑:
补一句:并行也有开销。线程切换、任务调度、连接池争用都会带来额外成本,甚至可能让尾延迟(P99/P999)变差,所以要同时看平均耗时和延迟分布。
并行任务通常要落在“有界线程池”:合理设置 core/maxPoolSize、有界队列容量和拒绝策略;同时对“下游并发度”再加一层控制(例如 Semaphore/限流令牌),避免峰值把公共资源打爆。
坑一:忽略下游的承载能力
坑二:线程池乱配,抢占宝贵资源
坑三:超时与降级没设计好
补一句工程经验:并行优化通常先改善平均耗时,但更要盯住 P99/P999 的尾延迟,否则高峰期会出现“越并越慢”的反噬。
所以实践标准可以简单记为:
很多同学写接口时,只是把它当作“写一段业务逻辑”。
但在高并发系统里,你其实是在设计调用拓扑:
当你用“调用拓扑图”的视角看一个接口时,会发现:
以后你在 Code Review 里遇到这种“看起来没问题但就是串着等”的调用链,可以按这个顺序快速检查:
写代码时把“等待关系”画清楚、问对问题,你的系统就会更快也更稳。
本文使用 markdown.com.cn 排版