关于golang监听rabbitmq消息队列任务断线自动重连接的问题
来源:脚本之家
2022-12-29 12:55:29
0浏览
收藏
来到golang学习网的大家,相信都是编程学习爱好者,希望在这里学习Golang相关编程知识。下面本篇文章就来带大家聊聊《关于golang监听rabbitmq消息队列任务断线自动重连接的问题》,介绍一下消息队列、RabbitMQ、断线自动重连,希望对大家的知识积累有所帮助,助力实战开发!
golang监听消息队列rabbitmq任务脚本,当rabbimq消息队列断开连接后自动重试,重新唤起协程执行任务
需求背景:
goalng常驻内存任务脚本监听rbmq执行任务
任务脚本由supervisor来管理
当rabbitmq长时间断开连接会出现如下图 进程处于fatal状态
假如因为不可抗拒因素,rabbitmq服务器内存满了或者其它原因导致rabbitmq消息队列服务停止了
如果是短时间的停止重启,supervisor是可以即时唤醒该程序。如果服务器长时间没有恢复正常运行,程序就会出现fatal进程启动失败的状态,此时可以通过告警来提醒开发人员
如果以上告警能时时通知运维人员此问题可以略过了。今天讨论的是如果在长时间断开连接还能在服务器恢复正常情况下自动实现重连。
代码实现一:
消费者:
package main import ( "fmt" "github.com/ichunt2019/golang-rbmq-sl/utils/rabbitmq" ) type RecvPro struct { } //// 实现消费者 消费消息失败 自动进入延时尝试 尝试3次之后入库db /* 返回值 error 为nil 则表示该消息消费成功 否则消息会进入ttl延时队列 重复尝试消费3次 3次后消息如果还是失败 消息就执行失败 进入告警 FailAction */ func (t *RecvPro) Consumer(dataByte []byte) error { //time.Sleep(500*time.Microsecond) //return errors.New("顶顶顶顶") fmt.Println(string(dataByte)) //time.Sleep(1*time.Second) return nil //消息已经消费3次 失败了 请进行处理 如果消息 消费3次后 仍然失败 此处可以根据情况 对消息进行告警提醒 或者 补偿 入库db 钉钉告警等等 func (t *RecvPro) FailAction(err error,dataByte []byte) error { fmt.Println(err) fmt.Println("任务处理失败了,我要进入db日志库了") fmt.Println("任务处理失败了,发送钉钉消息通知主人") func main() { t := &RecvPro{} //rabbitmq.Recv(rabbitmq.QueueExchange{ // "a_test_0001", // "", // "amqp://guest:guest@192.168.2.232:5672/", //},t,5) /* runNums: 表示任务并发处理数量 一般建议 普通任务1-3 就可以了 */ err := rabbitmq.Recv(rabbitmq.QueueExchange{ "a_test_0001", "hello_go", "direct", "amqp://guest:guest@192.168.1.169:5672/", },t,4) if(err != nil){ fmt.Println(err) }
rabbitmq代码
package rabbitmq import ( "errors" "strconv" "time" //"errors" "fmt" "github.com/streadway/amqp" "log" ) // 定义全局变量,指针类型 var mqConn *amqp.Connection var mqChan *amqp.Channel // 定义生产者接口 type Producer interface { MsgContent() string } type RetryProducer interface { // 定义接收者接口 type Receiver interface { Consumer([]byte) error FailAction(error , []byte) error // 定义RabbitMQ对象 type RabbitMQ struct { connection *amqp.Connection Channel *amqp.Channel dns string QueueName string // 队列名称 RoutingKey string // key名称 ExchangeName string // 交换机名称 ExchangeType string // 交换机类型 producerList []Producer retryProducerList []RetryProducer receiverList []Receiver // 定义队列交换机对象 type QueueExchange struct { QuName string // 队列名称 RtKey string // key值 ExName string // 交换机名称 ExType string // 交换机类型 Dns string //链接地址 // 链接rabbitMQ func (r *RabbitMQ)MqConnect() (err error){ mqConn, err = amqp.Dial(r.dns) r.connection = mqConn // 赋值给RabbitMQ对象 if err != nil { fmt.Printf("rbmq链接失败 :%s \n", err) } return // 关闭mq链接 func (r *RabbitMQ)CloseMqConnect() (err error){ err = r.connection.Close() if err != nil{ fmt.Printf("关闭mq链接失败 :%s \n", err) func (r *RabbitMQ)MqOpenChannel() (err error){ mqConn := r.connection r.Channel, err = mqConn.Channel() //defer mqChan.Close() fmt.Printf("MQ打开管道失败:%s \n", err) return err func (r *RabbitMQ)CloseMqChannel() (err error){ r.Channel.Close() // 创建一个新的操作对象 func NewMq(q QueueExchange) RabbitMQ { return RabbitMQ{ QueueName:q.QuName, RoutingKey:q.RtKey, ExchangeName: q.ExName, ExchangeType: q.ExType, dns:q.Dns, func (mq *RabbitMQ) sendMsg (body string) (err error) { err = mq.MqOpenChannel() ch := mq.Channel log.Printf("Channel err :%s \n", err) defer mq.Channel.Close() if mq.ExchangeName != "" { if mq.ExchangeType == ""{ mq.ExchangeType = "direct" } err = ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil) if err != nil { log.Printf("ExchangeDeclare err :%s \n", err) // 用于检查队列是否存在,已经存在不需要重复声明 _, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, nil) log.Printf("QueueDeclare err :%s \n", err) // 绑定任务 if mq.RoutingKey != "" && mq.ExchangeName != "" { err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil) log.Printf("QueueBind err :%s \n", err) if mq.ExchangeName != "" && mq.RoutingKey != ""{ err = mq.Channel.Publish( mq.ExchangeName, // exchange mq.RoutingKey, // routing key false, // mandatory false, // immediate amqp.Publishing { ContentType: "text/plain", Body: []byte(body), }) }else{ "", // exchange mq.QueueName, // routing key /* 发送延时消息 */ func (mq *RabbitMQ)sendDelayMsg(body string,ttl int64) (err error){ err =mq.MqOpenChannel() return if ttl <p>实现重连方式很多,下面实现方式比较简单</p> <p style="text-align:center"><img alt="" src="/uploads/20221229/167229091663ad22644f2b1.png"></p> <p>1.Recv方法创建ampq链接</p> <p>2.启动协程开始执行任务 </p> <p style="margin-left:40px">MqOpenChannel 打开一个channel通道处理amqp消息</p> <p style="margin-left:40px">拿到消息 处理任务</p> <p> 3,协程中捕获异常发送消息到taskQuit </p><p> 4,主进程监听taskQuit管道 开始尝试重新链接amqp 直到链接成功</p> <p> 5,重新链接成功后启动新的协程处理任务</p> <p>主要代码分析:</p> <pre class="brush:java;">/* runNums 开启并发执行任务数量 */ func Recv(queueExchange QueueExchange,receiver Receiver,runNums int) (err error){ mq := NewMq(queueExchange) //链接rabbitMQ err = mq.MqConnect() if(err != nil){ return } //rbmq断开链接后 协程退出释放信号 taskQuit:= make(chan struct{}, 1) //尝试链接rbmq tryToLinkC := make(chan struct{}, 1) //开始执行任务 for i:=1;i <p>到这里,我们也就讲完了《关于golang监听rabbitmq消息队列任务断线自动重连接的问题》的内容了。个人认为,基础知识的学习和巩固,是为了更好的将其运用到项目中,欢迎关注golang学习网公众号,带你了解更多关于golang的知识点!</p>
版本声明
本文转载于:脚本之家 如有侵犯,请联系study_golang@163.com删除

- 上一篇
- 详解go语言中sort如何排序

- 下一篇
- Golang小数操作指南之判断小数点位数与四舍五入
评论列表
-
- 土豪的流沙
- 这篇文章内容真及时,很详细,真优秀,mark,关注博主了!希望博主能多写Golang相关的文章。
- 2023-01-17 07:28:36
-
- 粗暴的超短裙
- 很有用,一直没懂这个问题,但其实工作中常常有遇到...不过今天到这,帮助很大,总算是懂了,感谢师傅分享博文!
- 2023-01-15 04:37:36
-
- 清脆的中心
- 受益颇多,一直没懂这个问题,但其实工作中常常有遇到...不过今天到这,看完之后很有帮助,总算是懂了,感谢大佬分享技术贴!
- 2023-01-03 08:49:18
-
- 幸福的面包
- 这篇文章真及时,很详细,很有用,收藏了,关注楼主了!希望楼主能多写Golang相关的文章。
- 2023-01-03 04:50:26
查看更多
最新文章
-
- Golang · Go教程 | 7小时前 |
- Golangcompress库使用技巧分享
- 320浏览 收藏
-
- Golang · Go教程 | 7小时前 |
- Golang发送HTTP请求教程与客户端实现
- 277浏览 收藏
-
- Golang · Go教程 | 8小时前 |
- Golang高并发TCP服务器搭建教程
- 322浏览 收藏
-
- Golang · Go教程 | 8小时前 |
- Go调用Bash管道实现方法
- 266浏览 收藏
-
- Golang · Go教程 | 8小时前 | golang 接口 错误处理 中间件 ErrorResponse
- Golang接口错误统一处理方法
- 268浏览 收藏
-
- Golang · Go教程 | 8小时前 |
- Golang开发多任务爬虫调度器教程
- 138浏览 收藏
-
- Golang · Go教程 | 8小时前 |
- Golangerrors.As错误捕获详解
- 293浏览 收藏
-
- Golang · Go教程 | 8小时前 |
- Golang数据库加速技巧:SQL预处理与连接池实战
- 199浏览 收藏
-
- Golang · Go教程 | 9小时前 |
- Golang单元测试编写技巧与实战
- 337浏览 收藏
-
- Golang · Go教程 | 9小时前 |
- Golangdeferpanicrecover使用技巧详解
- 430浏览 收藏
查看更多
课程推荐
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 543次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 514次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 499次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 484次学习
查看更多
AI推荐
-
- AI Mermaid流程图
- SEO AI Mermaid 流程图工具:基于 Mermaid 语法,AI 辅助,自然语言生成流程图,提升可视化创作效率,适用于开发者、产品经理、教育工作者。
- 597次使用
-
- 搜获客【笔记生成器】
- 搜获客笔记生成器,国内首个聚焦小红书医美垂类的AI文案工具。1500万爆款文案库,行业专属算法,助您高效创作合规、引流的医美笔记,提升运营效率,引爆小红书流量!
- 599次使用
-
- iTerms
- iTerms是一款专业的一站式法律AI工作台,提供AI合同审查、AI合同起草及AI法律问答服务。通过智能问答、深度思考与联网检索,助您高效检索法律法规与司法判例,告别传统模板,实现合同一键起草与在线编辑,大幅提升法律事务处理效率。
- 621次使用
-
- TokenPony
- TokenPony是讯盟科技旗下的AI大模型聚合API平台。通过统一接口接入DeepSeek、Kimi、Qwen等主流模型,支持1024K超长上下文,实现零配置、免部署、极速响应与高性价比的AI应用开发,助力专业用户轻松构建智能服务。
- 685次使用
-
- 迅捷AIPPT
- 迅捷AIPPT是一款高效AI智能PPT生成软件,一键智能生成精美演示文稿。内置海量专业模板、多样风格,支持自定义大纲,助您轻松制作高质量PPT,大幅节省时间。
- 584次使用
查看更多
相关文章
-
- go+redis实现消息队列发布与订阅的详细过程
- 2023-01-07 161浏览
-
- Golang rabbitMQ生产者消费者实现示例
- 2022-12-23 148浏览
-
- redisstream实现消息队列的实践
- 2022-12-31 260浏览
-
- Golang中优秀的消息队列NSQ基础安装及使用详解
- 2022-12-27 260浏览
-
- golang实现redis的延时消息队列功能示例
- 2023-01-17 368浏览