当前位置:首页 > 文章列表 > 文章 > python教程 > Python连接RabbitMQ教程详解

Python连接RabbitMQ教程详解

2025-08-29 13:27:56 0浏览 收藏

知识点掌握了,还需要不断练习才能熟练运用。下面golang学习网给大家带来一个文章开发实战,手把手教大家学习《Python用pika连接RabbitMQ教程》,在实现功能的过程中也带大家重新温习相关知识点,温故而知新,回头看看说不定又有不一样的感悟!

使用Python通过Pika操作RabbitMQ的核心步骤为:1. 建立连接(BlockingConnection);2. 创建通道(Channel);3. 声明持久化队列(queue_declare,durable=True);4. 发布消息时设置消息持久化(delivery_mode=2);5. 消费者手动确认消息(auto_ack=False,basic_ack)。选择RabbitMQ因其基于AMQP协议,具备高可靠性、丰富的交换机类型和成熟生态,适合需要复杂路由与消息不丢失的场景。Pika的同步模式(BlockingConnection)适用于简单脚本,逻辑直观但阻塞线程;异步模式(如SelectConnection)适用于高并发服务,通过事件循环提升吞吐量,但编程复杂度更高。消息持久化需同时设置队列和消息的durable与delivery_mode=2,确保服务重启后消息可恢复;确认机制通过关闭auto_ack并手动调用basic_ack实现,保证消息被成功处理前不会丢失,支持“至少一次”投递,要求消费者具备幂等性。完整实现包括生产者发送5条消息并休眠,消费者接收后模拟处理耗时并发送确认,确保消息可靠传递与处理。

Python怎样操作消息队列?pika连接RabbitMQ

Python操作消息队列,Pika连接RabbitMQ,这组合在很多后端系统里简直是标配。它提供了一种可靠的异步通信机制,让不同服务间解耦,处理高并发任务变得游刃有余。通过Pika库,Python应用可以轻松地发布消息到队列,也能消费队列中的消息,实现服务间的有效协作。

解决方案

要用Python通过Pika操作RabbitMQ,核心步骤围绕着连接(Connection)、通道(Channel)、声明队列/交换机、发布消息和消费消息。最直接的方式是使用Pika的BlockingConnection,它简单易用,适合快速开发和非高并发场景。

生产者(发布消息)示例:

import pika
import time

# 连接RabbitMQ服务器
# 这里假设RabbitMQ运行在本地,没有用户名密码
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明一个队列,如果队列不存在则创建。durable=True表示队列持久化
# 即使RabbitMQ重启,队列也不会丢失
channel.queue_declare(queue='my_queue', durable=True)

message_count = 0
while message_count < 5:
    message = f"Hello World! Message number {message_count}"
    # 发布消息到默认交换机,路由键为队列名
    # delivery_mode=2表示消息持久化,即使RabbitMQ重启,消息也不会丢失
    channel.basic_publish(
        exchange='',
        routing_key='my_queue',
        body=message,
        properties=pika.BasicProperties(
            delivery_mode=2,  # 使消息持久化
        )
    )
    print(f" [x] Sent '{message}'")
    message_count += 1
    time.sleep(1) # 模拟发送间隔

connection.close()

消费者(消费消息)示例:

import pika
import time

# 连接RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明相同的队列,确保消费者知道要从哪个队列取消息
channel.queue_declare(queue='my_queue', durable=True)

print(' [*] Waiting for messages. To exit press CTRL+C')

def callback(ch, method, properties, body):
    """
    消息处理回调函数
    ch: channel对象
    method: 包含消息的 delivery tag 等信息
    properties: 消息属性
    body: 消息体
    """
    print(f" [x] Received '{body.decode()}'")
    time.sleep(body.count(b'.')) # 模拟处理消息的耗时
    # 消息处理完成后,发送确认回执
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print(" [x] Done")

# 设置QoS (Quality of Service),每次只分发一条消息给消费者
# 这样可以防止一个消费者处理速度慢,导致所有消息堆积在它那里
channel.basic_qos(prefetch_count=1)

# 开始消费消息,no_ack=False表示需要手动发送确认回执
channel.basic_consume(queue='my_queue', on_message_callback=callback, auto_ack=False)

# 启动消费循环,会一直阻塞直到连接关闭
channel.start_consuming()

为什么选择RabbitMQ作为消息队列?

我个人觉得,RabbitMQ就像是消息队列界的“老黄牛”,它稳定、可靠、功能全面,是很多企业级应用的首选。你可能听说过Kafka在高吞吐量大数据场景的优势,但对于需要复杂路由、高可靠性、并且消息处理量并非天文数字的业务,RabbitMQ的优势就凸显出来了。

它基于AMQP(Advanced Message Queuing Protocol)协议,这个协议本身就为消息的可靠传输、事务性、路由等提供了强大的保障。这意味着,当你的系统对消息丢失零容忍时,RabbitMQ能给你足够的信心。它的交换机(Exchange)类型非常丰富,比如直连(Direct)、扇出(Fanout)、主题(Topic)、头部(Headers),可以满足各种复杂的路由需求。想象一下,你有一个订单系统,新订单消息可能需要同时通知库存、物流和客户服务部门,通过RabbitMQ的Topic交换机,一条消息就能精准地分发给所有相关方,这可比你手动维护多个HTTP请求或者数据库触发器要优雅和高效得多。

而且,RabbitMQ的社区非常活跃,文档也相当完善,遇到问题很容易找到解决方案。对于Python开发者来说,Pika库的支持也很好,虽然Pika的API有时候看起来有点“原生”,需要对AMQP概念有一定理解,但这正是它强大和灵活的体现。它的成熟度,让它在很多关键业务场景下,成为一个让人放心的选择。

Pika库的异步与同步模式有何不同?

Pika库提供了两种主要的工作模式:同步模式(BlockingConnection)和异步模式(如SelectConnectionTornadoConnection等)。这两种模式的选择,很大程度上取决于你的应用场景和对性能、并发处理的需求。

想象一下,你是个餐厅服务员。

同步模式 (BlockingConnection) 就像你一次只服务一位客人。你接到一个点餐请求,就一直等到菜做好、客人吃完、结账,你才去接下一个客人的请求。这种模式简单直接,逻辑清晰,不容易出错。对于那些一次只处理少量消息、或者在脚本中一次性发送一批消息然后退出的场景,BlockingConnection是完美的。它会阻塞当前线程,直到操作完成。比如,一个简单的日志收集脚本,把日志发到RabbitMQ,用BlockingConnection就足够了,写起来也很顺手。

异步模式 (SelectConnection, TornadoConnection等) 则像你同时服务多位客人。你接到点餐请求后,不是傻等,而是把点餐单交给厨房,然后立刻去接其他客人的请求,或者去处理其他事务(比如倒水、收拾桌子)。当厨房通知你菜好了,你再回来处理之前的点餐。这种模式复杂一些,因为你需要管理多个并发的事件,但它的效率非常高,不会因为一个耗时操作而阻塞整个应用。对于Web服务(如Django、Flask应用)、长连接服务、或者需要处理大量并发消息的消费者来说,异步模式是必不可少的。它能让你的应用在等待I/O(比如网络传输)的时候,去处理其他事情,大大提升了吞吐量和响应速度。当然,这也意味着你需要更深入地理解Python的异步编程模型,比如回调函数、事件循环等。虽然学习曲线可能陡峭一点,但一旦掌握,它能让你的Python应用在处理消息队列时如虎添翼。

消息的持久化与确认机制在Pika中如何实现?

在生产环境中,消息的持久化和确认机制是确保消息不丢失的关键。这两点在Pika中都有明确的实现方式,它们共同构筑了RabbitMQ“至少一次”消息投递的可靠性保障。

消息持久化:

消息持久化分为两个层面:队列持久化消息持久化

  1. 队列持久化: 当你声明队列时,将durable参数设为True

    channel.queue_declare(queue='my_queue', durable=True)

    这样做是为了防止RabbitMQ服务器重启后,你创建的队列消失。如果队列是非持久化的,服务器一重启,队列就不在了,即使里面有持久化的消息,也无处可寻了。

  2. 消息持久化: 当你发布消息时,通过pika.BasicProperties设置delivery_mode=2

    channel.basic_publish(
        exchange='',
        routing_key='my_queue',
        body=message,
        properties=pika.BasicProperties(
            delivery_mode=2,  # 2表示消息持久化
        )
    )

    这告诉RabbitMQ,这条消息需要写入磁盘。这样,即使在消息到达消费者并被确认之前,RabbitMQ服务器突然崩溃,重启后这条消息也能从磁盘中恢复,并重新投递。

需要注意的是,即使消息和队列都持久化了,也不能保证100%不丢消息。比如,在消息到达RabbitMQ并写入磁盘的极短时间内,如果服务器崩溃,消息可能还是会丢失。对于极端高可靠性的场景,你可能还需要结合发布者确认(Publisher Confirms)机制。

消息确认机制(Acknowledgements):

这是消费者端确保消息被成功处理的关键。当消费者从队列中获取一条消息后,它需要向RabbitMQ发送一个“确认回执”(Acknowledgement)。

  1. 关闭自动确认:basic_consume时,将auto_ack参数设为False

    channel.basic_consume(queue='my_queue', on_message_callback=callback, auto_ack=False)

    默认情况下,auto_ackTrue,这意味着RabbitMQ一旦把消息发给消费者,就认为消息已经被成功处理并从队列中删除。这显然是不安全的。

  2. 手动发送确认回执: 在你的消息回调函数中,当消息被成功处理后,调用channel.basic_ack()

    def callback(ch, method, properties, body):
        # ... 处理消息的逻辑 ...
        ch.basic_ack(delivery_tag=method.delivery_tag) # 确认消息

    delivery_tag是RabbitMQ分配给每条消息的唯一标识。通过发送确认回执,RabbitMQ就知道这条消息已经被消费者成功处理,可以安全地从队列中删除了。

如果消费者在处理消息过程中崩溃,或者没有发送确认回执,RabbitMQ会认为这条消息没有被成功处理,并在消费者重新连接或有其他消费者可用时,将这条消息重新投递给其他消费者。这保证了消息的“至少一次”投递:消息可能被投递多次,但绝不会丢失。当然,这也意味着你的消费者需要具备幂等性,即多次处理同一条消息不会产生副作用。

你也可以使用basic_nack(否定确认)或basic_reject来拒绝消息。basic_nack更灵活,可以指定是否将消息重新放回队列(requeue=True/False),而basic_reject通常用于彻底拒绝一条消息,且只能拒绝一条。在实际应用中,根据业务逻辑选择合适的确认方式,是构建健壮消息系统的关键。

到这里,我们也就讲完了《Python连接RabbitMQ教程详解》的内容了。个人认为,基础知识的学习和巩固,是为了更好的将其运用到项目中,欢迎关注golang学习网公众号,带你了解更多关于rabbitmq,消息持久化,消息确认,Pika,同步异步模式的知识点!

Vantage每日倒计时怎么用Vantage每日倒计时怎么用
上一篇
Vantage每日倒计时怎么用
Golangchannel实现观察者与事件驱动
下一篇
Golangchannel实现观察者与事件驱动
查看更多
最新文章
查看更多
课程推荐
  • 前端进阶之JavaScript设计模式
    前端进阶之JavaScript设计模式
    设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
    542次学习
  • GO语言核心编程课程
    GO语言核心编程课程
    本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
    511次学习
  • 简单聊聊mysql8与网络通信
    简单聊聊mysql8与网络通信
    如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
    498次学习
  • JavaScript正则表达式基础与实战
    JavaScript正则表达式基础与实战
    在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
    487次学习
  • 从零制作响应式网站—Grid布局
    从零制作响应式网站—Grid布局
    本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
    484次学习
查看更多
AI推荐
  • 千音漫语:智能声音创作助手,AI配音、音视频翻译一站搞定!
    千音漫语
    千音漫语,北京熠声科技倾力打造的智能声音创作助手,提供AI配音、音视频翻译、语音识别、声音克隆等强大功能,助力有声书制作、视频创作、教育培训等领域,官网:https://qianyin123.com
    419次使用
  • MiniWork:智能高效AI工具平台,一站式工作学习效率解决方案
    MiniWork
    MiniWork是一款智能高效的AI工具平台,专为提升工作与学习效率而设计。整合文本处理、图像生成、营销策划及运营管理等多元AI工具,提供精准智能解决方案,让复杂工作简单高效。
    418次使用
  • NoCode (nocode.cn):零代码构建应用、网站、管理系统,降低开发门槛
    NoCode
    NoCode (nocode.cn)是领先的无代码开发平台,通过拖放、AI对话等简单操作,助您快速创建各类应用、网站与管理系统。无需编程知识,轻松实现个人生活、商业经营、企业管理多场景需求,大幅降低开发门槛,高效低成本。
    413次使用
  • 达医智影:阿里巴巴达摩院医疗AI影像早筛平台,CT一扫多筛癌症急慢病
    达医智影
    达医智影,阿里巴巴达摩院医疗AI创新力作。全球率先利用平扫CT实现“一扫多筛”,仅一次CT扫描即可高效识别多种癌症、急症及慢病,为疾病早期发现提供智能、精准的AI影像早筛解决方案。
    427次使用
  • 智慧芽Eureka:更懂技术创新的AI Agent平台,助力研发效率飞跃
    智慧芽Eureka
    智慧芽Eureka,专为技术创新打造的AI Agent平台。深度理解专利、研发、生物医药、材料、科创等复杂场景,通过专家级AI Agent精准执行任务,智能化工作流解放70%生产力,让您专注核心创新。
    449次使用
微信登录更方便
  • 密码登录
  • 注册账号
登录即同意 用户协议隐私政策
返回登录
  • 重置密码