Reactor非阻塞聚合Flux为Mono的方法
“纵有疾风来,人生不言弃”,这句话送给正在学习文章的朋友们,也希望在阅读本文《Reactor中非阻塞地聚合多个Flux结果为单个Mono的策略 》后,能够真的帮助到大家。我也会在后续的文章中,陆续更新文章相关的技术文章,有好的建议欢迎大家在评论留言,非常感谢!

本文旨在探讨如何在Project Reactor框架中,以非阻塞的方式将两个独立的`Flux`数据流的聚合结果合并为一个单一的`Mono`对象。通过分析传统阻塞方法的不足,文章将重点介绍`Mono.zipWith`操作符及其与`Flux.collectList()`的结合使用,以构建一个完全响应式、高效且易于维护的数据聚合解决方案,并提供详细的代码示例和最佳实践建议。
引言:响应式数据流聚合的挑战
在现代异步编程中,尤其是在基于Project Reactor等响应式框架构建的系统中,我们经常面临需要从多个独立的异步源获取数据,并将这些数据聚合成一个单一的、结构化的复合对象的场景。例如,从不同的服务或数据库查询成功账户列表和失败账户列表,然后将它们合并到一个支付结果对象中。
这种聚合操作的关键在于保持整个处理流程的非阻塞性。如果在此过程中引入任何阻塞操作,将可能导致线程资源浪费、系统吞吐量下降,并违背响应式编程的核心理念。
领域模型概览
为了更好地理解问题和解决方案,我们首先定义一个示例领域模型Payments,它包含成功账户和失败账户的列表:
package org.example;
import lombok.Builder;
import lombok.Getter;
import lombok.ToString;
import java.util.List;
@Getter
@Builder
@ToString
public class Payments {
private List<SuccessAccount> successAccounts;
private List<FailedAccount> failedAccounts;
@Getter
@Builder
@ToString
public static class SuccessAccount {
private String name;
private String accountNumber;
}
@Getter
@Builder
@ToString
public static class FailedAccount {
private String name;
private String accountNumber;
private String errorCode;
}
}我们的目标是获取两个独立的Flux流(一个产生SuccessAccount,另一个产生FailedAccount),然后将它们各自收集成列表,最终封装进一个Payments对象,并且整个过程是非阻塞的。
问题剖析:为何传统方法会阻塞
初学者在尝试聚合多个响应式流时,可能会不自觉地引入阻塞操作。考虑以下尝试将两个Flux收集为列表并构建Payments对象的代码:
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.List;
public class Main {
public static void main(String[] args) {
getPaymentData().subscribe(System.out::println);
}
public static Mono<Payments> getPaymentData() {
Flux<Payments.SuccessAccount> accountsSucceeded = getAccountsSucceeded();
Flux<Payments.FailedAccount> accountsFailed = getAccountsFailed();
List<Payments.SuccessAccount> successAccounts = new ArrayList<>();
List<Payments.FailedAccount> failedAccounts = new ArrayList<>();
// 这里的subscribe调用是问题所在
accountsFailed.collectList().subscribe(failedAccounts::addAll); // 阻塞或导致竞态条件
accountsSucceeded.collectList().subscribe(successAccounts::addAll); // 阻塞或导致竞态条件
return Mono.just(Payments.builder()
.failedAccounts(failedAccounts)
.successAccounts(successAccounts)
.build());
}
// ... getAccountsSucceeded() 和 getAccountsFailed() 方法省略 ...
}上述代码段中,accountsFailed.collectList().subscribe(failedAccounts::addAll); 和 accountsSucceeded.collectList().subscribe(successAccounts::addAll); 是问题的根源。
- subscribe() 的作用: subscribe() 是一个终端操作,它会触发响应式流的执行。然而,它本身是异步的,意味着当subscribe()被调用时,流的元素并不会立即被收集到failedAccounts或successAccounts列表中。
- 打破响应式链: 在subscribe()调用之后,紧接着的return Mono.just(...)会立即执行。此时,failedAccounts和successAccounts列表很可能仍然是空的,因为它们的填充是在异步的subscribe回调中进行的。这导致Payments对象被构建时包含了空列表,或者如果流处理需要时间,则会因为尝试同步获取异步结果而导致阻塞(尽管此处代码本身不会阻塞主线程,但它无法正确获取异步结果)。
- 非响应式: 这种模式实际上将响应式流的异步结果拉回到命令式代码中处理,破坏了整个操作的响应式特性。为了正确等待结果,开发者可能会引入block()操作,从而彻底失去了非阻塞的优势。
解决方案:使用Mono.zipWith实现非阻塞聚合
Project Reactor提供了zip系列操作符来解决这种并发聚合问题。Mono.zipWith(或静态方法Mono.zip)是专门用于将两个或多个Mono的结果合并成一个新Mono的强大工具。
其核心思想是:当所有参与zip操作的源Mono都成功发出其元素时,zip操作符会收集这些元素,并将它们作为参数传递给一个提供的BiFunction(或Function),该函数负责将这些元素组合成一个新的结果,然后由zip返回的Mono发出这个新结果。
在我们的场景中,我们需要将两个Flux转换为Mono,然后对这两个Mono
进行zip操作。Flux.collectList()操作符正是为此而生,它将一个Flux
>。
以下是使用Mono.zipWith的正确实现:
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.ArrayList;
import java.util.List;
public class Main {
public static void main(String[] args) {
getPaymentData().subscribe(System.out::println);
}
public static Mono<Payments> getPaymentData() {
Flux<Payments.SuccessAccount> accountsSucceededFlux = getAccountsSucceeded();
Flux<Payments.FailedAccount> accountsFailedFlux = getAccountsFailed();
// 1. 将Flux转换为Mono<List>
Mono<List<Payments.FailedAccount>> failedAccountsMono = accountsFailedFlux.collectList();
Mono<List<Payments.SuccessAccount>> successAccountsMono = accountsSucceededFlux.collectList();
// 2. 使用zipWith合并两个Mono<List>的结果
return failedAccountsMono.zipWith(
successAccountsMono,
(failedAccounts, successAccounts) -> Payments.builder()
.failedAccounts(failedAccounts)
.successAccounts(successAccounts)
.build()
);
}
// 模拟获取成功账户的Flux
public static Flux<Payments.SuccessAccount> getAccountsSucceeded() {
return Flux.just(Payments.SuccessAccount.builder()
.accountNumber("1234345")
.name("Payee1")
.build(),
Payments.SuccessAccount.builder()
.accountNumber("83673674")
.name("Payee2")
.build());
}
// 模拟获取失败账户的Flux
public static Flux<Payments.FailedAccount> getAccountsFailed() {
return Flux.just(Payments.FailedAccount.builder()
.accountNumber("12234345")
.name("Payee3")
.errorCode("8938")
.build(),
Payments.FailedAccount.builder()
.accountNumber("3342343")
.name("Payee4")
.errorCode("8938")
.build());
}
}代码解析:
- failedAccountsFlux.collectList(): 这个操作符将Flux
转换成一个Mono - >。这意味着当原始的Flux发出所有元素并完成时,collectList()会收集这些元素到一个列表中,并将这个列表作为单一元素由返回的Mono发出。
- failedAccountsMono.zipWith(successAccountsMono, ...):
- failedAccountsMono是第一个源Mono,它会发出失败账户列表。
- successAccountsMono是第二个源Mono,它会发出成功账户列表。
- BiFunction (failedAccounts, successAccounts) -> ... 是一个组合函数。当两个源Mono都成功发出它们的列表时,zipWith会调用这个函数,将两个列表作为参数传入。
- 在这个函数内部,我们使用Payments.builder()来构建最终的Payments对象,将两个列表分别设置到failedAccounts和successAccounts字段中。
- 非阻塞性: 整个链条都是非阻塞的。zipWith会等待两个上游Mono都完成后才执行组合函数,并且它本身返回一个Mono
,允许消费者在它发出结果时进行订阅和处理,而无需在中间环节阻塞。
总结与最佳实践
- 保持响应式链条完整: 在Reactor中,避免在中间操作中调用subscribe()。subscribe()应该作为整个响应式链的最后一个操作,用于触发执行并处理最终结果。在链条中间,应使用各种操作符(如map、flatMap、filter、zip等)来转换和组合流。
- 利用zip系列操作符: 当需要将多个独立的异步结果聚合成一个复合结果时,Mono.zip或Flux.zip是理想的选择。它们确保所有依赖的异步操作都完成后才进行组合,同时保持非阻塞。
- collectList()的重要性: Flux.collectList()是将Flux转换为Mono
- 的关键操作。这在需要将一个元素流聚合成一个集合,并进一步参与Mono操作(如zipWith)时非常有用。
- 错误处理: zip操作符具有“快速失败”的特性。如果任何一个参与zip的源Mono发出错误,那么整个zip操作返回的Mono也会立即发出该错误,而不会等待其他源完成。在实际应用中,应考虑如何使用onErrorResume、doOnError等操作符进行错误处理。
通过遵循这些原则,开发者可以构建出高效、健壮且完全响应式的应用程序,充分利用Project Reactor带来的并发和非阻塞优势。
以上就是本文的全部内容了,是否有顺利帮助你解决问题?若是能给你带来学习上的帮助,请大家多多支持golang学习网!更多关于文章的相关知识,也可关注golang学习网公众号。
Emmet快捷键使用技巧与输入优化指南
- 上一篇
- Emmet快捷键使用技巧与输入优化指南
- 下一篇
- Linux用户组管理全攻略
-
- 文章 · java教程 | 2小时前 |
- Fabric1.19.3自定义物品添加方法
- 219浏览 收藏
-
- 文章 · java教程 | 2小时前 |
- Java基础:虚引用怎么用?
- 137浏览 收藏
-
- 文章 · java教程 | 3小时前 |
- AppUpdateManager初始化错误解决方法
- 165浏览 收藏
-
- 文章 · java教程 | 3小时前 |
- Java异常会触发事务回滚吗?事务回滚详解
- 366浏览 收藏
-
- 文章 · java教程 | 3小时前 |
- Java变量生命周期管理方法
- 398浏览 收藏
-
- 文章 · java教程 | 3小时前 | java 类实例
- Java获取Class对象的四种方法
- 183浏览 收藏
-
- 文章 · java教程 | 4小时前 |
- Java全局异常处理方法与应用
- 307浏览 收藏
-
- 文章 · java教程 | 4小时前 | java
- WeakHashMap实现缓存淘汰机制解析
- 126浏览 收藏
-
- 文章 · java教程 | 4小时前 |
- HashSet元素唯一性原理详解
- 176浏览 收藏
-
- 文章 · java教程 | 4小时前 |
- BufferedReader和Scanner怎么选?
- 258浏览 收藏
-
- 文章 · java教程 | 4小时前 | treemap 键排序
- TreeMap按键排序方法与技巧解析
- 361浏览 收藏
-
- 文章 · java教程 | 5小时前 |
- Java浮点数比较技巧与避坑指南
- 476浏览 收藏
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 543次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 516次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 500次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 485次学习
-
- ChatExcel酷表
- ChatExcel酷表是由北京大学团队打造的Excel聊天机器人,用自然语言操控表格,简化数据处理,告别繁琐操作,提升工作效率!适用于学生、上班族及政府人员。
- 3345次使用
-
- Any绘本
- 探索Any绘本(anypicturebook.com/zh),一款开源免费的AI绘本创作工具,基于Google Gemini与Flux AI模型,让您轻松创作个性化绘本。适用于家庭、教育、创作等多种场景,零门槛,高自由度,技术透明,本地可控。
- 3557次使用
-
- 可赞AI
- 可赞AI,AI驱动的办公可视化智能工具,助您轻松实现文本与可视化元素高效转化。无论是智能文档生成、多格式文本解析,还是一键生成专业图表、脑图、知识卡片,可赞AI都能让信息处理更清晰高效。覆盖数据汇报、会议纪要、内容营销等全场景,大幅提升办公效率,降低专业门槛,是您提升工作效率的得力助手。
- 3589次使用
-
- 星月写作
- 星月写作是国内首款聚焦中文网络小说创作的AI辅助工具,解决网文作者从构思到变现的全流程痛点。AI扫榜、专属模板、全链路适配,助力新人快速上手,资深作者效率倍增。
- 4714次使用
-
- MagicLight
- MagicLight.ai是全球首款叙事驱动型AI动画视频创作平台,专注于解决从故事想法到完整动画的全流程痛点。它通过自研AI模型,保障角色、风格、场景高度一致性,让零动画经验者也能高效产出专业级叙事内容。广泛适用于独立创作者、动画工作室、教育机构及企业营销,助您轻松实现创意落地与商业化。
- 3962次使用
-
- 提升Java功能开发效率的有力工具:微服务架构
- 2023-10-06 501浏览
-
- 掌握Java海康SDK二次开发的必备技巧
- 2023-10-01 501浏览
-
- 如何使用java实现桶排序算法
- 2023-10-03 501浏览
-
- Java开发实战经验:如何优化开发逻辑
- 2023-10-31 501浏览
-
- 如何使用Java中的Math.max()方法比较两个数的大小?
- 2023-11-18 501浏览

