Rabbit消费者已连接,但在一段时间后无法从队列接收信息
来源:stackoverflow
2024-02-07 17:42:23
0浏览
收藏
一分耕耘,一分收获!既然都打开这篇《Rabbit消费者已连接,但在一段时间后无法从队列接收信息》,就坚持看下去,学下去吧!本文主要会给大家讲到等等知识点,如果大家对本文有好的建议或者看到有不足之处,非常欢迎大家积极提出!在后续文章我会继续更新Golang相关的内容,希望对大家都有所帮助!
问题内容
我有一个已成功连接并使用来自 rabbitmq 的消息的代码。 但过了一段时间,出现此问题时,无论消费者如何连接,都无法接收消息。
package rabbitmq
import (
"context"
"fmt"
"os"
"runtime"
"time"
"github.com/getsentry/sentry-go"
log "github.com/sirupsen/logrus"
"github.com/streadway/amqp"
)
type RabbitMQ struct {
conn *amqp.Connection
queues map[string]amqp.Queue
connString string
rabbitCloseError chan *amqp.Error
recoveryConsumer []RecoveryConsumer
// ch *amqp.Channel
// exchange_name string
}
type RecoveryConsumer struct {
queueName string
routingKey string
handler func(d amqp.Delivery)
concurrency int8
}
type (
Delivery = amqp.Delivery
)
func (r *RabbitMQ) IfExist(queueName string) bool {
for _, item := range r.recoveryConsumer {
if item.queueName == queueName {
return false
}
}
return true
}
func (r *RabbitMQ) RecoverConsumers() {
for _, i := range r.recoveryConsumer {
go r.StartConsumer(i.queueName, i.routingKey, i.handler, int(i.concurrency))
log.Infof("Consumer for %v successfully recovered", i.queueName)
}
}
func (r *RabbitMQ) Reconnector() {
for { //nolint
select {
case err := <-r.rabbitCloseError:
log.Errorf("[RabbitMQ] Connection Closed : {'Reason': '%v', 'Code': '%v', 'Recoverable': '%v', 'Server_Side': '%v'", err.Reason, err.Code, err.Recover, err.Server)
log.Debug("Reconnecting after connection closed")
sentry.CaptureException(fmt.Errorf("[RabbitMQ] Connection Closed : {'Reason': '%v', 'Code': '%v', 'Recoverable': '%v', 'Server_Side': '%v'", err.Reason, err.Code, err.Recover, err.Server))
r.connection()
r.RecoverConsumers()
}
}
}
func (r *RabbitMQ) Connect(host string, user string, pass string, virthost string) {
r.connString = "amqp://" + user + ":" + pass + "@" + host + "/"
if virthost != "/" || len(virthost) > 0 {
r.connString += virthost
}
r.connection()
go r.Reconnector()
}
func (r *RabbitMQ) connection() {
if r.conn != nil {
if !r.conn.IsClosed() {
return
} else {
log.Info("Reconnecting to RabbitMQ...")
}
}
var err error
r.conn, err = amqp.Dial(r.connString)
if err != nil {
sentry.CaptureException(err)
log.Fatalf("%s: %s", "Failed to connect to RabbitMQ", err)
}
r.conn.Config.Heartbeat = 5 * time.Second
r.queues = make(map[string]amqp.Queue)
r.rabbitCloseError = make(chan *amqp.Error)
r.conn.NotifyClose(r.rabbitCloseError)
log.Debug("[RabbitMQ] Successfully connected to RabbitMQ")
log.Infof("Number of Active Thread/Goroutine %v", runtime.NumGoroutine())
}
func (r *RabbitMQ) CreateChannel() *amqp.Channel {
ch, err := r.conn.Channel()
if err != nil {
log.Error(err)
return nil
}
return ch
}
func (r *RabbitMQ) QueueAttach(ch *amqp.Channel, name string) {
q, err := ch.QueueDeclare(
name, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatalf("%s: %s", "Failed to declare a queue", err)
}
r.queues[name] = q
// r.ch.ExchangeDeclare()
}
func (r *RabbitMQ) TempQueueAttach(ch *amqp.Channel, name string) {
_, err := ch.QueueDeclare(
name, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
ch.Close()
log.Fatalf("%s: %s", "Failed to declare a temporary queue", err)
sentry.CaptureException(fmt.Errorf("%s: %s", "Failed consume message", err))
}
}
func (r *RabbitMQ) Publish(ch *amqp.Channel, queue string, body []byte) {
span := sentry.StartSpan(context.TODO(), "publish message")
defer span.Finish()
err := ch.Publish(
"", // exchange
r.queues[queue].Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: map[string]interface{}{},
ContentType: "application/json",
ContentEncoding: "",
DeliveryMode: amqp.Persistent,
Priority: 0,
CorrelationId: "",
ReplyTo: "",
Expiration: "",
MessageId: "",
Timestamp: time.Now().UTC(),
Type: "",
UserId: "",
AppId: "",
Body: body,
})
if err != nil {
sentry.CaptureException(err)
log.Fatalf("%s: %s", "Failed to publish a message", err)
}
log.Debugf("Send message: %s", string(body))
}
func (r *RabbitMQ) StartConsumer(queueName string, routingKey string, handler func(d amqp.Delivery), concurrency int) {
// prefetch 4x as many messages as we can handle at once
ok := r.IfExist(queueName)
if ok {
r.recoveryConsumer = append(r.recoveryConsumer, RecoveryConsumer{
queueName: queueName,
routingKey: routingKey,
handler: handler,
concurrency: int8(concurrency),
})
}
ch, err := r.conn.Channel()
if err != nil {
log.Error(err)
}
prefetchCount := concurrency * 1
err = ch.Qos(prefetchCount, 0, false)
if err != nil {
sentry.CaptureException(err)
log.Errorf("%s: %s", "Failed QOS", err)
}
r.QueueAttach(ch, queueName)
msgs, err := ch.Consume(
queueName, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
sentry.CaptureException(err)
log.Fatalf("%s: %s", "Failed consume message", err)
sentry.CaptureException(fmt.Errorf("%s: %s", "Failed consume message", err))
os.Exit(1)
}
go func() {
for msg := range msgs {
handler(msg)
}
}()
}
func (r *RabbitMQ) WaitMessage(ch *amqp.Channel, queueName string, timeout time.Duration) []byte {
st := time.Now()
for time.Since(st).Seconds() < 1 {
msg, ok, err := ch.Get(queueName, true)
if err != nil {
log.Errorf("Can't consume queue. Error: %s", err.Error())
sentry.CaptureException(err)
return nil
}
if ok {
return msg.Body
}
time.Sleep(50 * time.Millisecond)
}
return nil
}
这可能是什么原因? 我知道它应该在 rabbit 端,但客户端库无法显示任何错误......
因为开始工作后,消费继续倾听,工作成功。
正确答案
我唯一可以建议你尝试的是使用心跳。这将检测连接是否由于网络故障等原因而中断。
您可以在这里查看:https://www.rabbitmq.com/heartbeats.html。
我对这个不太确定,自从我使用它以来已经很长时间了,但是如果你在接收消息位周围放置一个 try catch,当连接终止时它可能会出现在 catch 中。
希望这对您解决问题有所帮助。
文中关于的知识介绍,希望对你的学习有所帮助!若是受益匪浅,那就动动鼠标收藏这篇《Rabbit消费者已连接,但在一段时间后无法从队列接收信息》文章吧,也可关注golang学习网公众号了解相关技术文章。
版本声明
本文转载于:stackoverflow 如有侵犯,请联系study_golang@163.com删除
如何将Mac系统还原至初始版本
- 上一篇
- 如何将Mac系统还原至初始版本
- 下一篇
- Docker容器或docker compose在使用Postgres、Golang、Debian 11和Agora appbuilder后端时遇到错误
查看更多
最新文章
-
- Golang · Go问答 | 1年前 |
- 在读取缓冲通道中的内容之前退出
- 139浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 戈兰岛的全球 GOPRIVATE 设置
- 204浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 如何将结构作为参数传递给 xml-rpc
- 325浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 如何用golang获得小数点以下两位长度?
- 478浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 如何通过 client-go 和 golang 检索 Kubernetes 指标
- 486浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 将多个“参数”映射到单个可变参数的习惯用法
- 439浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 将 HTTP 响应正文写入文件后出现 EOF 错误
- 357浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 结构中映射的匿名列表的“复合文字中缺少类型”
- 352浏览 收藏
-
- Golang · Go问答 | 1年前 |
- NATS Jetstream 的性能
- 101浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 如何将复杂的字符串输入转换为mapstring?
- 440浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 相当于GoLang中Java将Object作为方法参数传递
- 212浏览 收藏
-
- Golang · Go问答 | 1年前 |
- 如何确保所有 goroutine 在没有 time.Sleep 的情况下终止?
- 143浏览 收藏
查看更多
课程推荐
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 543次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 516次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 500次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 485次学习
查看更多
AI推荐
-
- ChatExcel酷表
- ChatExcel酷表是由北京大学团队打造的Excel聊天机器人,用自然语言操控表格,简化数据处理,告别繁琐操作,提升工作效率!适用于学生、上班族及政府人员。
- 3186次使用
-
- Any绘本
- 探索Any绘本(anypicturebook.com/zh),一款开源免费的AI绘本创作工具,基于Google Gemini与Flux AI模型,让您轻松创作个性化绘本。适用于家庭、教育、创作等多种场景,零门槛,高自由度,技术透明,本地可控。
- 3397次使用
-
- 可赞AI
- 可赞AI,AI驱动的办公可视化智能工具,助您轻松实现文本与可视化元素高效转化。无论是智能文档生成、多格式文本解析,还是一键生成专业图表、脑图、知识卡片,可赞AI都能让信息处理更清晰高效。覆盖数据汇报、会议纪要、内容营销等全场景,大幅提升办公效率,降低专业门槛,是您提升工作效率的得力助手。
- 3429次使用
-
- 星月写作
- 星月写作是国内首款聚焦中文网络小说创作的AI辅助工具,解决网文作者从构思到变现的全流程痛点。AI扫榜、专属模板、全链路适配,助力新人快速上手,资深作者效率倍增。
- 4535次使用
-
- MagicLight
- MagicLight.ai是全球首款叙事驱动型AI动画视频创作平台,专注于解决从故事想法到完整动画的全流程痛点。它通过自研AI模型,保障角色、风格、场景高度一致性,让零动画经验者也能高效产出专业级叙事内容。广泛适用于独立创作者、动画工作室、教育机构及企业营销,助您轻松实现创意落地与商业化。
- 3807次使用
查看更多
相关文章
-
- GoLand调式动态执行代码
- 2023-01-13 502浏览
-
- 用Nginx反向代理部署go写的网站。
- 2023-01-17 502浏览
-
- Golang取得代码运行时间的问题
- 2023-02-24 501浏览
-
- 请问 go 代码如何实现在代码改动后不需要Ctrl+c,然后重新 go run *.go 文件?
- 2023-01-08 501浏览
-
- 如何从同一个 io.Reader 读取多次
- 2023-04-11 501浏览

