GoogleCloudPub/Sub筛选后无法拉取消息解决方法
在文章实战开发的过程中,我们经常会遇到一些这样那样的问题,然后要卡好半天,等问题解决了才发现原来一些细节知识点还是没有掌握好。今天golang学习网就整理分享《Google Cloud Pub/Sub订阅筛选后无法拉取消息解决方法》,聊聊,希望可以帮助到正在努力赚钱的你。
Google Cloud Pub/Sub 消息筛选器概述
Google Cloud Pub/Sub 是一种异步消息传递服务,用于解耦生产者和消费者。为了进一步优化消息处理,Pub/Sub 提供了消息筛选器(Message Filters)功能。通过在订阅上配置筛选器,消费者可以只接收那些满足特定条件(例如,消息属性匹配特定值或消息数据符合某种模式)的消息,从而减少不必要的消息处理负担,提高消费者端的效率和资源利用率。
问题描述:带筛选器的订阅客户端无法拉取消息
在使用 Python 客户端库与 Pub/Sub 交互时,有时会遇到一个令人困惑的现象:当订阅没有应用任何筛选器时,订阅客户端能够正常拉取并处理消息;但一旦为订阅配置了消息筛选器,即使 Pub/Sub 控制台显示订阅中有匹配筛选条件的消息积压,客户端却无法接收到任何消息,如同停止工作一般。
以下是典型的 Pub/Sub Python 订阅客户端代码结构:
import os import time # 导入time模块 import asyncio # 如果是异步应用,可能需要asyncio from google.cloud import pubsub_v1 # from app.services.subscription_service import save_bill_events # 示例业务逻辑 # from app.utils.constants import BILL_SUBSCRIPTION_GCP_PROJECT_ID, BILL_EVENT_SUBSCRIPTION_ID # 示例常量 # from app.utils.logging_tracing_manager import get_logger # 示例日志 # logger = get_logger(__file__) # 示例日志初始化 def callback(message: pubsub_v1.subscriber.message.Message) -> None: # save_bill_events(message.data) # 示例:处理消息数据 print(f"Received message: {message.data.decode()}") message.ack() # 确认消息 # 假设这些常量已经定义 BILL_SUBSCRIPTION_GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "your-gcp-project-id") BILL_EVENT_SUBSCRIPTION_ID = "your-subscription-id" subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path(BILL_SUBSCRIPTION_GCP_PROJECT_ID, BILL_EVENT_SUBSCRIPTION_ID) # Limit the subscriber to only have fixed number of outstanding messages at a time. flow_control = pubsub_v1.types.FlowControl(max_messages=50) # streaming_pull_future 在这里定义,但实际启动拉取操作在 poll_bill_subscription 中 streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback, flow_control=flow_control) async def poll_bill_subscription(): # 在此处或在调用此函数之前,可以考虑添加延迟 # await asyncio.sleep(10) # 异步应用中使用 with subscriber: try: # When `timeout` is not set, result() will block indefinitely, # unless an exception is encountered first. print(f"Listening for messages on {subscription_path}...") streaming_pull_future.result() except Exception as e: print(f"An error occurred while pulling message from subscription {BILL_EVENT_SUBSCRIPTION_ID}: {e}") # logger.error( # 示例日志 # f"An error occurred while pulling message from subscription {BILL_EVENT_SUBSCRIPTION_ID}", # exc_info=True) pass # 示例:如何运行异步函数 # if __name__ == "__main__": # # 假设订阅是新创建的或刚刚应用了筛选器 # # 在这里添加一个延迟,等待订阅配置传播 # print("Waiting for subscription configuration to propagate...") # time.sleep(10) # 阻塞式等待10秒,适用于非async上下文 # asyncio.run(poll_bill_subscription())
根本原因分析:订阅创建与传播延迟
此问题的根本原因在于 Google Cloud Pub/Sub 服务的“最终一致性”特性。当您创建一个新的 Pub/Sub 订阅,尤其是在创建时立即为其配置了消息筛选器,或者在现有订阅上添加/修改了筛选器时,这些配置的变更需要一定的时间才能在 Pub/Sub 的全球分布式系统中完全传播和生效。
如果您的应用程序在订阅创建/更新完成后的极短时间内就初始化订阅客户端并尝试开始拉取消息,那么客户端可能在订阅的筛选器配置完全“就绪”之前就发出了请求。在这种情况下,Pub/Sub 服务可能无法正确识别或应用该筛选器,导致客户端无法接收到任何消息,尽管后台实际上有匹配筛选条件的消息正在等待。系统通常不会立即返回错误,而是表现为客户端“空转”,不拉取消息。
解决方案:引入启动延迟
解决此问题的最直接和有效的方法是在订阅客户端开始拉取消息之前,引入一个短暂的等待时间。这个延迟允许 Pub/Sub 系统有足够的时间来完成订阅配置的内部传播和同步。
以下是在上述 Python 代码中引入延迟的几种方式:
在主程序启动订阅拉取之前添加同步延迟: 如果您的应用程序在同步上下文中启动 Pub/Sub 消费者,可以在 subscriber.subscribe() 调用之前,或者在调用 poll_bill_subscription() 之前添加 time.sleep()。
import time # ... (之前的导入和客户端初始化代码) # 在初始化订阅客户端或开始拉取操作之前添加延迟 # 假设订阅是新创建的或刚刚应用了筛选器 print("Waiting for subscription configuration to propagate...") time.sleep(10) # 例如等待10秒,可以根据实际情况调整 streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback, flow_control=flow_control) async def poll_bill_subscription(): # ... (函数体不变)
在异步拉取函数内部添加异步延迟: 如果您的应用程序是基于 asyncio 的异步应用,并且 poll_bill_subscription 是一个 async 函数,那么可以在该函数内部使用 await asyncio.sleep()。
import asyncio # ... (之前的导入和客户端初始化代码) async def poll_bill_subscription(): # 在异步函数内部添加延迟 print("Waiting for subscription configuration to propagate asynchronously...") await asyncio.sleep(10) # 例如等待10秒 with subscriber: try: print(f"Listening for messages on {subscription_path}...") streaming_pull_future.result() except Exception as e: print(f"An error occurred while pulling message from subscription {BILL_EVENT_SUBSCRIPTION_ID}: {e}") pass
通过引入一个适当的延迟(例如 5 到 15 秒),可以显著提高订阅客户端在带有筛选器的订阅上成功拉取消息的可靠性。
注意事项与最佳实践
- 延迟时长: 没有一个固定的“最佳”延迟时长。它可能取决于 Pub/Sub 服务的当前负载、网络条件以及订阅配置的复杂性。建议从一个较小的值(如 5 秒)开始尝试,如果问题依然存在,则逐步增加延迟。在生产环境中,应通过监控和测试来确定一个稳健的延迟值。
- 幂等性与重试机制: 即使引入了延迟,也不能完全排除瞬时网络问题或服务暂时性故障。因此,应用程序应始终设计为具有幂等性(重复处理消息不会产生副作用),并实现健壮的重试机制,以应对任何潜在的拉取失败。
- 监控: 持续监控 Pub/Sub 订阅的关键指标至关重要,包括:
- 积压消息数量: 检查是否有消息积压但未被消费。
- 拉取请求速率: 确认客户端是否正在发送拉取请求。
- 订阅者错误日志: 留意客户端或 Pub/Sub 服务端报告的任何错误。 这些监控数据可以帮助您及时发现问题并调整策略。
- 部署策略: 在自动化部署流程中,如果您的部署包含创建或修改 Pub/Sub 订阅的步骤,那么在启动依赖于这些订阅的消费者服务之前,应考虑加入一个明确的等待或健康检查步骤,以确保订阅配置已完全生效。
- 非确定性问题: 这种延迟问题可能不是每次部署或启动都会发生,这增加了调试的难度。因此,即使在测试环境中没有复现,在生产环境中也应考虑添加这种启动延迟作为一种防御性编程措施。
总结
当 Google Cloud Pub/Sub 订阅客户端在应用了消息筛选器的订阅上无法拉取消息时,一个常见的但容易被忽视的原因是订阅配置在分布式系统中的传播延迟。通过在订阅客户端开始拉取操作之前引入一个适当的延迟,可以有效解决此问题,确保客户端在订阅完全就绪后才开始工作。同时,结合健壮的错误处理、重试机制和持续监控,可以构建更加可靠和弹性的 Pub/Sub 消息处理系统。
以上就是本文的全部内容了,是否有顺利帮助你解决问题?若是能给你带来学习上的帮助,请大家多多支持golang学习网!更多关于文章的相关知识,也可关注golang学习网公众号。

- 上一篇
- 电脑网络不稳定?路由器优化设置指南

- 下一篇
- JWT认证教程:Token生成与验证全解析
-
- 文章 · python教程 | 7分钟前 |
- DjangoNoReverseMatch错误解决教程
- 257浏览 收藏
-
- 文章 · python教程 | 16分钟前 |
- Python操作FTP服务器方法详解
- 106浏览 收藏
-
- 文章 · python教程 | 17分钟前 |
- Python开发区块链入门教程
- 270浏览 收藏
-
- 文章 · python教程 | 40分钟前 |
- Python实现AES加密方法详解
- 102浏览 收藏
-
- 文章 · python教程 | 52分钟前 | Python 数据处理 Pandas 分类数据 category类型
- Python快速转换分类数据为category类型
- 190浏览 收藏
-
- 文章 · python教程 | 54分钟前 |
- JSON数据处理全攻略
- 113浏览 收藏
-
- 文章 · python教程 | 55分钟前 |
- Pythonzip文件压缩教程详解
- 215浏览 收藏
-
- 文章 · python教程 | 1小时前 |
- 正则匹配XMLHTML标签方法详解
- 279浏览 收藏
-
- 文章 · python教程 | 1小时前 |
- TFIDF原理与TfidfVectorizer使用详解
- 391浏览 收藏
-
- 文章 · python教程 | 1小时前 |
- Python实现WebSocket实时通信教程
- 245浏览 收藏
-
- 文章 · python教程 | 1小时前 |
- Python多变量配置技巧全解析
- 230浏览 收藏
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 542次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 511次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 498次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 484次学习
-
- TextIn智能文字识别平台
- TextIn智能文字识别平台,提供OCR、文档解析及NLP技术,实现文档采集、分类、信息抽取及智能审核全流程自动化。降低90%人工审核成本,提升企业效率。
- 7次使用
-
- 简篇AI排版
- SEO 简篇 AI 排版,一款强大的 AI 图文排版工具,3 秒生成专业文章。智能排版、AI 对话优化,支持工作汇报、家校通知等数百场景。会员畅享海量素材、专属客服,多格式导出,一键分享。
- 7次使用
-
- 小墨鹰AI快排
- SEO 小墨鹰 AI 快排,新媒体运营必备!30 秒自动完成公众号图文排版,更有 AI 写作助手、图片去水印等功能。海量素材模板,一键秒刷,提升运营效率!
- 8次使用
-
- Aifooler
- AI Fooler是一款免费在线AI音频处理工具,无需注册安装,即可快速实现人声分离、伴奏提取。适用于音乐编辑、视频制作、练唱素材等场景,提升音频创作效率。
- 7次使用
-
- 易我人声分离
- 告别传统音频处理的繁琐!易我人声分离,基于深度学习的AI工具,轻松分离人声和背景音乐,支持在线使用,无需安装,简单三步,高效便捷。
- 8次使用
-
- Flask框架安装技巧:让你的开发更高效
- 2024-01-03 501浏览
-
- Django框架中的并发处理技巧
- 2024-01-22 501浏览
-
- 提升Python包下载速度的方法——正确配置pip的国内源
- 2024-01-17 501浏览
-
- Python与C++:哪个编程语言更适合初学者?
- 2024-03-25 501浏览
-
- 品牌建设技巧
- 2024-04-06 501浏览