Java接入Pulsar消息队列教程
从现在开始,我们要努力学习啦!今天我给大家带来《Java接入Pulsar消息队列全攻略》,感兴趣的朋友请继续看下去吧!下文中的内容我们主要会涉及到等等知识点,如果在阅读本文过程中有遇到不清楚的地方,欢迎留言呀!我们一起讨论,一起学习!
在Java中操作Pulsar消息队列需掌握客户端API的异步特性及消息生产与消费模式。1. 引入Pulsar客户端依赖;2. 初始化PulsarClient,配置serviceUrl、ioThreads、listenerThreads等参数;3. 创建生产者并配置批量发送、发送超时、压缩类型等参数以提升吞吐量和可靠性;4. 创建消费者并选择合适的订阅模式(Exclusive、Shared、Failover、Key_Shared)以满足不同业务场景对顺序性、并发性和高可用性的需求;5. 使用Schema实现端到端类型安全,通过Schema.JSON或Schema.AVRO等指定数据结构,避免运行时错误并简化开发;6. 正确处理消息确认机制(acknowledge、negativeAcknowledge)以确保消息可靠投递。合理配置各项参数并结合实际业务需求选择合适的消息模型,是高效使用Pulsar的关键。
在Java中操作Pulsar消息队列,核心在于理解其客户端API的异步特性以及消息的生产与消费模式。这不单单是调用几个方法那么简单,更深层次地,它关乎你如何设计消息流、处理并发、确保数据一致性,以及应对各种网络和服务端异常。在我看来,Pulsar的Java客户端设计得相当成熟,提供了一套强大而灵活的工具集,但要真正用好它,得跳出传统思维,拥抱异步编程范式。

解决方案
要构建一个完整的Java Pulsar消息操作方案,我们通常会从依赖引入开始,然后依次是客户端初始化、生产者配置与消息发送、消费者订阅与消息处理,以及一些高级特性如Schema的使用和错误处理。
首先,你得在项目的pom.xml
(如果你用Maven)或build.gradle
(如果你用Gradle)中引入Pulsar客户端库:

<!-- Maven --> <dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client-java</artifactId> <version>2.11.0</version> <!-- 根据实际情况选择最新稳定版 --> </dependency>
接着,是Pulsar客户端的初始化。这是一个重量级对象,通常在应用启动时创建一次,并复用。
import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Messages; import java.util.concurrent.TimeUnit; import java.util.concurrent.CompletableFuture; public class PulsarOperation { private static PulsarClient client; private static final String SERVICE_URL = "pulsar://localhost:6650"; // 你的Pulsar服务地址 private static final String TOPIC_NAME = "persistent://public/default/my-java-topic"; private static final String SUBSCRIPTION_NAME = "my-java-subscription"; static { try { client = PulsarClient.builder() .serviceUrl(SERVICE_URL) .connectionTimeout(30, TimeUnit.SECONDS) // 连接超时 .ioThreads(8) // IO线程数,根据实际负载调整 .listenerThreads(8) // 监听线程数,用于处理回调 .enableTcpNoDelay(true) // 启用TCP_NODELAY // .authentication(AuthenticationFactory.token("YOUR_TOKEN")) // 如果需要认证 .build(); System.out.println("Pulsar client initialized successfully."); } catch (PulsarClientException e) { System.err.println("Failed to initialize Pulsar client: " + e.getMessage()); e.printStackTrace(); // 生产环境应有更健壮的错误处理 } } // 生产者示例 public void produceMessages() throws PulsarClientException { // 使用Schema.STRING,也可以是Schema.JSON(MyObject.class), Schema.AVRO等 Producer<String> producer = client.newProducer(Schema.STRING) .topic(TOPIC_NAME) .producerName("my-java-producer") .enableBatching(true) // 启用批量发送 .batchingMaxMessages(1000) // 批处理最大消息数 .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS) // 批处理最大延迟 .sendTimeout(30, TimeUnit.SECONDS) // 发送超时 .blockIfQueueFull(true) // 如果发送队列满了,则阻塞 .create(); for (int i = 0; i < 10; i++) { String message = "Hello Pulsar from Java " + i; // 异步发送,推荐方式 producer.sendAsync(message) .thenAccept(msgId -> System.out.println("Message sent: " + message + ", ID: " + msgId)) .exceptionally(ex -> { System.err.println("Failed to send message: " + message + ", Error: " + ex.getMessage()); return null; }); // 同步发送,会阻塞当前线程直到消息发送成功或失败 // try { // MessageId msgId = producer.send(message); // System.out.println("Message sent synchronously: " + message + ", ID: " + msgId); // } catch (PulsarClientException e) { // System.err.println("Failed to send message synchronously: " + message + ", Error: " + e.getMessage()); // } } // 确保所有异步消息发送完成 producer.flush(); producer.close(); // 生产环境通常不会立即关闭,而是复用 } // 消费者示例 public void consumeMessages() throws PulsarClientException { Consumer<String> consumer = client.newConsumer(Schema.STRING) .topic(TOPIC_NAME) .subscriptionName(SUBSCRIPTION_NAME) .subscriptionType(SubscriptionType.Shared) // 订阅类型,Shared, Exclusive, Failover, Key_Shared .messageListener((cons, msg) -> { // 使用消息监听器异步处理 try { System.out.println("Received message: " + msg.getValue() + " (ID: " + msg.getMessageId() + ")"); cons.acknowledge(msg); // 确认消息,表示处理成功 } catch (Exception e) { System.err.println("Error processing message: " + msg.getValue() + ", " + e.getMessage()); cons.negativeAcknowledge(msg); // 负确认,消息会被重新投递 } }) .subscribe(); System.out.println("Consumer started. Press Ctrl+C to exit."); // 保持主线程运行,以便消费者可以持续接收消息 try { Thread.currentThread().join(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { consumer.close(); } } public static void main(String[] args) throws PulsarClientException, InterruptedException { PulsarOperation op = new PulsarOperation(); // 生产消息 System.out.println("--- Producing messages ---"); op.produceMessages(); Thread.sleep(2000); // 等待消息发送完成 // 消费消息 System.out.println("--- Consuming messages ---"); op.consumeMessages(); // 应用关闭时关闭客户端 if (client != null) { client.close(); System.out.println("Pulsar client closed."); } } }
这只是一个基础框架,实际应用中会涉及更复杂的逻辑,比如异常重试、死信队列、事务消息、多租户管理等等。

如何在Java项目中高效配置Pulsar客户端与生产者?
高效地配置Pulsar客户端和生产者,不仅仅是设置几个参数那么简单,它更像是一门艺术,需要在吞吐量、延迟和资源消耗之间找到一个平衡点。我个人在实践中,发现很多性能问题其实都出在初始配置上。
对于PulsarClient
,核心在于serviceUrl
的正确性,以及ioThreads
和listenerThreads
的合理分配。ioThreads
主要处理网络IO,而listenerThreads
则处理消息回调,比如消费者收到消息后的处理逻辑。如果你有大量的消息处理逻辑,listenerThreads
设得太少,可能会成为瓶颈。此外,connectionTimeout
和operationTimeout
也挺关键,它们决定了客户端在网络不稳定时能有多大的容忍度。我通常会把connectionTimeout
设置得稍微长一些,比如30秒,给Pulsar集群足够的启动时间或者网络抖动恢复时间。
至于生产者,它的配置选项就更多了,直接影响到消息发送的效率和可靠性。
批量发送 (
enableBatching
,batchingMaxMessages
,batchingMaxPublishDelay
): 这是提升吞吐量的杀手锏。将多条小消息打包成一个批次发送,可以显著减少网络往返次数和CPU开销。batchingMaxMessages
控制批次大小,batchingMaxPublishDelay
控制批次的最大等待时间。我的经验是,如果你对延迟不那么敏感,可以适当增加batchingMaxPublishDelay
来换取更高的吞吐量。但如果追求低延迟,就需要权衡了。发送超时 (
sendTimeout
): 这是一个非常重要的参数。如果消息在指定时间内没有收到Pulsar的确认,就会被认为是发送失败。设置一个合理的超时时间,既能避免长时间阻塞,又能给Pulsar足够的处理时间。我通常会根据网络状况和Pulsar集群的负载情况来调整,比如5秒到30秒不等。发送队列满时的行为 (
blockIfQueueFull
): 当内部发送队列满了之后,生产者可以选择阻塞当前线程,或者立即失败。blockIfQueueFull(true)
在消息积压时会阻塞调用线程,这在某些场景下可以作为一种简单的流量控制手段,避免瞬时流量过大压垮生产者。但如果你的应用对延迟非常敏感,或者需要快速失败,那么可能需要考虑设置为false
并实现自己的重试逻辑。消息路由模式 (
messageRoutingMode
): 对于分区Topic,Pulsar提供了多种路由模式,比如RoundRobinPartition
(轮询)和SinglePartition
(单分区)。如果你需要保证某个Key的消息始终发送到同一个分区以保持严格的顺序,那么HashingStickinessConsistentHashing
或自定义MessageRouter
就显得非常重要。压缩 (
compressionType
): 对于大消息或者对网络带宽敏感的场景,启用消息压缩(如LZ4
,ZSTD
)能有效减少网络传输量。这虽然会增加一些CPU开销,但通常是值得的。
这些参数的组合使用,就像调配一杯咖啡,需要根据你应用的具体需求和Pulsar集群的实际负载来反复尝试和优化。没有一劳永逸的最佳配置,只有最适合你当前场景的配置。
Java消费者如何选择Pulsar订阅模式并处理消息确认机制?
Pulsar的订阅模式(Subscription Type)是其区别于其他MQ的一个亮点,它提供了非常灵活的消息分发策略,但也常常是新手容易混淆的地方。正确选择订阅模式是确保消息按预期被处理的关键,这直接影响到你的消费者应用如何扩展、如何处理故障。
Pulsar提供了四种主要的订阅模式:
Exclusive (独占模式): 这是最严格的模式。一个订阅只能有一个消费者连接到它。如果有多个消费者尝试连接同一个订阅,只有第一个会成功,其他会失败。这非常适合需要严格消息顺序的场景,或者当一个消息只能被一个消费者处理时。它的优点是简单,消息顺序有保证,但缺点是无法横向扩展,存在单点故障。我个人在做一些关键业务的审计日志处理时,会倾向于使用这种模式,确保每一条日志都按顺序被一个且仅一个处理器消费。
Shared (共享模式): 这是最常用的模式,也是实现负载均衡和高吞吐量的首选。一个订阅可以有多个消费者连接,Puler会轮询地将消息分发给这些消费者。消息的顺序性在分区级别无法保证(因为不同消息可能由不同消费者处理),但在单个消费者内部,通常还是有序的。它的优点是高可用、易于扩展,缺点是消息处理顺序不严格。大部分的微服务异步通信,我都会选择Shared模式。
Failover (灾备模式): 这种模式下,一个订阅也可以有多个消费者连接,但只有一个是"主"消费者,负责接收所有消息。其他消费者处于"备用"状态。当主消费者发生故障时,Pulsar会自动从备用消费者中选举一个新的主消费者来接管消息流。它提供了高可用性,同时又能在一定程度上保持消息的顺序性(在主消费者切换前)。这很适合那些需要高可用但又不能完全放弃顺序性的场景,比如一些状态机流转。
Key_Shared (键共享模式): 这是Pulsar 2.7.0版本后引入的一种高级模式,它结合了Shared和Exclusive的优点。在Key_Shared模式下,一个订阅可以有多个消费者,Pulsar会根据消息的
orderingKey
(或者messageKey
)将具有相同Key的消息发送给同一个消费者。这样,对于同一个Key的消息,可以保证严格的顺序性,而不同Key的消息则可以在多个消费者之间并行处理。这对于需要按业务ID(如订单ID、用户ID)进行顺序处理,但又希望整体并行处理的场景非常有用。我个人觉得这是Pulsar在消息分发策略上的一大创新,它很好地解决了“全局无序但局部有序”的业务需求。
消息确认机制 (Acknowledgment)
Pulsar的消息确认机制是确保消息可靠投递的关键。消费者接收到消息后,必须向Pulsar发送确认(ACK),Pulsar才会认为这条消息已被成功处理并可以删除。如果未确认,Pulsar会在一定时间后重新投递。
consumer.acknowledge(msg)
: 这是最常见的确认方式,表示单条消息处理成功。对于Shared
和Exclusive
模式,通常都用这个。consumer.acknowledgeCumulative(msg)
: 累积确认。它会确认包括当前消息在内的所有之前未确认的消息。这个方法主要用于Exclusive
和Failover
模式,因为这些模式下消息是严格有序的。如果乱用在Shared
模式下,可能会导致一些未处理的消息也被错误地确认掉。consumer.negativeAcknowledge(msg)
(NACK): 负确认。当你处理消息失败时(比如业务异常、数据解析错误),可以使用NACK。Pulsar会认为这条消息处理失败,并在稍后重新投递。NACK通常会有延迟投递的机制,避免立即重试导致死循环。合理使用NACK,配合重试策略和死信队列(Dead Letter Topic),可以构建非常健壮的错误处理流程。consumer.redeliverUnacknowledgedMessages()
: 这个方法可以手动触发Pulsar重新投递所有当前消费者未确认的消息。通常用于消费者在处理过程中遇到不可恢复的错误,需要快速重置状态,让消息重新回到队列中。
我建议,在设计消费者时,一定要仔细考虑你的业务场景对消息顺序、并发处理和容错能力的要求,然后选择最合适的订阅模式。同时,对于消息的ACK/NACK操作,务必放在try-catch
块中,确保即使业务逻辑失败,也能正确地进行负确认,避免消息丢失或重复消费。
Pulsar的Schema注册与Java客户端如何实现端到端类型安全?
在消息队列的世界里,数据格式的兼容性一直是个头疼的问题。如果生产者发送的数据格式变了,而消费者没有及时更新,那后果可能就是一堆解析错误和难以追踪的生产事故。Pulsar的Schema机制就是为了解决这个问题而生,它提供了一种端到端(end-to-end)的类型安全保障,让我这个曾经饱受JSON序列化/反序列化之苦的开发者感到欣慰。
Pulsar的Schema是内置在Broker上的,当你第一次用某个Schema发送消息到Topic时,Pulsar会自动注册这个Topic的Schema。后续的生产者和消费者,只要指定相同的Schema,Pulsar就会在消息发送和接收时进行类型校验。如果Schema不匹配,Pulsar会直接拒绝操作,而不是等到运行时才报错。
Java客户端对Schema的支持非常友好,主要通过Schema
类来实现。Pulsar支持多种Schema类型,包括:
Schema.BYTES
: 默认的,最原始的字节数组,没有类型校验。Schema.STRING
: 字符串类型,内部使用UTF-8编码。Schema.JSON(Class> pojoClass)
: 用于POJO的JSON序列化和反序列化。这是我用得最多的,因为它直观且易于调试。你只需要提供一个Java Bean类,Pulsar就会自动处理JSON的转换。Schema.AVRO(Class> pojoClass)
: 用于POJO的Avro序列化和反序列化。Avro是一种数据序列化系统,它依赖于Schema来定义数据结构,具有紧凑、快速、跨语言的特点。Schema.PROTOBUF(Class> pojoClass)
: 用于POJO的Protocol Buffers序列化和反序列化。Google的Protobuf也是一种高效的序列化方案,尤其在跨语言和性能要求高的场景下表现出色。Schema.AUTO_CONSUME()
/Schema.AUTO_PRODUCE()
: 自动Schema推断,通常用于消费者,让Pulsar自动根据消息的Schema信息进行反序列化。但生产环境,我更倾向于明确指定Schema,这样更可控。
实现端到端类型安全的核心步骤:
定义POJO类: 无论是JSON、Avro还是Protobuf,你都需要一个Java对象来表示你的消息结构。
public class MyMessage { private String field1; private int field2; // Getter, Setter, Constructors public MyMessage() {} public MyMessage(String field1, int field2) { this.field1 = field1; this.field2 = field2; } @Override public String toString() { return "MyMessage{" + "field1='" + field1 + '\'' + ", field2=" + field2 + '}'; } }
生产者使用Schema: 在创建生产者时,指定对应的Schema。
// 生产者创建时指定Schema Producer<MyMessage> producer = client.newProducer(Schema.JSON(MyMessage.class)) .topic(TOPIC_NAME) .create(); MyMessage msg = new MyMessage("Hello Schema", 123); producer.sendAsync(msg).thenAccept(msgId -> System.out.println("Sent typed message: " + msg + ", ID: " + msgId));
消费者使用Schema: 消费者同样需要指定Schema。
// 消费者创建时指定Schema Consumer<MyMessage> consumer = client.newConsumer(Schema.JSON(MyMessage.class)) .topic(TOPIC_NAME) .subscriptionName(SUBSCRIPTION_NAME) .subscriptionType(SubscriptionType.Shared) .messageListener((cons, message) -> { try { MyMessage receivedMsg = message.getValue(); // 直接获取到POJO对象 System.out.println("Received typed message: " + receivedMsg + " (ID: " + message.getMessageId() + ")"); cons.acknowledge(message); } catch (Exception e) { System.err.println("Error processing typed message: " + e.getMessage()); cons.negativeAcknowledge(message); } }) .subscribe();
Schema的好处与我的思考:
- 避免运行时错误: 最直接的好处就是,在消息生产或消费阶段,如果数据结构不匹配,Pulsar会在网络传输之前就告诉你,而不是等到你的业务逻辑去解析时才抛出
ClassCastException
或JsonParseException
。这极大地提升了开发效率和系统的健壮性。 - 简化开发: 你不再需要手动进行JSON字符串与POJO之间的转换,Pulsar客户端帮你搞定一切。
- 版本管理: Pulsar的Schema还支持版本管理和兼容性检查。你可以定义Schema的兼容性策略(如
AlwaysCompatible
,
本篇关于《Java接入Pulsar消息队列教程》的介绍就到此结束啦,但是学无止境,想要了解学习更多关于文章的相关知识,请关注golang学习网公众号!

- 上一篇
- PHP字符串格式化技巧全解析

- 下一篇
- 豆包AI处理Python字典教程详解
-
- 文章 · java教程 | 2小时前 |
- Docker部署Java应用详细教程
- 440浏览 收藏
-
- 文章 · java教程 | 2小时前 |
- Java文件压缩解压全攻略详解
- 182浏览 收藏
-
- 文章 · java教程 | 3小时前 |
- Java开发数字孪生,Unity集成教程详解
- 452浏览 收藏
-
- 文章 · java教程 | 3小时前 |
- Java并发工具类实战与场景解析
- 462浏览 收藏
-
- 文章 · java教程 | 3小时前 |
- Resilience4j断路器配置全解析
- 143浏览 收藏
-
- 文章 · java教程 | 3小时前 |
- Java实现PDF电子签名方法解析
- 443浏览 收藏
-
- 文章 · java教程 | 3小时前 |
- SpringBoot整合RabbitMQ教程详解
- 112浏览 收藏
-
- 文章 · java教程 | 3小时前 |
- Java日志异步优化技巧分享
- 429浏览 收藏
-
- 文章 · java教程 | 4小时前 |
- JavaJSON库对比:Jackson、Gson与org.json详解
- 335浏览 收藏
-
- 文章 · java教程 | 4小时前 |
- Java实现磁盘数据恢复与取证方法解析
- 491浏览 收藏
-
- 文章 · java教程 | 4小时前 | 资源 安全性 设计规范 HTTP方法 RESTfulAPI
- RESTfulAPI设计规范与实战解析
- 385浏览 收藏
-
- 文章 · java教程 | 4小时前 |
- SpringBoot整合GraphQL的实用技巧
- 237浏览 收藏
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 542次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 511次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 498次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 484次学习
-
- 边界AI平台
- 探索AI边界平台,领先的智能AI对话、写作与画图生成工具。高效便捷,满足多样化需求。立即体验!
- 418次使用
-
- 免费AI认证证书
- 科大讯飞AI大学堂推出免费大模型工程师认证,助力您掌握AI技能,提升职场竞争力。体系化学习,实战项目,权威认证,助您成为企业级大模型应用人才。
- 425次使用
-
- 茅茅虫AIGC检测
- 茅茅虫AIGC检测,湖南茅茅虫科技有限公司倾力打造,运用NLP技术精准识别AI生成文本,提供论文、专著等学术文本的AIGC检测服务。支持多种格式,生成可视化报告,保障您的学术诚信和内容质量。
- 561次使用
-
- 赛林匹克平台(Challympics)
- 探索赛林匹克平台Challympics,一个聚焦人工智能、算力算法、量子计算等前沿技术的赛事聚合平台。连接产学研用,助力科技创新与产业升级。
- 663次使用
-
- 笔格AIPPT
- SEO 笔格AIPPT是135编辑器推出的AI智能PPT制作平台,依托DeepSeek大模型,实现智能大纲生成、一键PPT生成、AI文字优化、图像生成等功能。免费试用,提升PPT制作效率,适用于商务演示、教育培训等多种场景。
- 570次使用
-
- 提升Java功能开发效率的有力工具:微服务架构
- 2023-10-06 501浏览
-
- 掌握Java海康SDK二次开发的必备技巧
- 2023-10-01 501浏览
-
- 如何使用java实现桶排序算法
- 2023-10-03 501浏览
-
- Java开发实战经验:如何优化开发逻辑
- 2023-10-31 501浏览
-
- 如何使用Java中的Math.max()方法比较两个数的大小?
- 2023-11-18 501浏览