基于golang的简单分布式延时队列服务的实现
本篇文章向大家介绍《基于golang的简单分布式延时队列服务的实现》,主要包括分布式、延时队列,具有一定的参考价值,需要的朋友可以参考一下。
一、引言
背景
我们在做系统时,很多时候是处理实时的任务,请求来了马上就处理,然后立刻给用户以反馈。但有时也会遇到非实时的任务,比如确定的时间点发布重要公告。或者需要在用户做了一件事情的X分钟/Y小时后,EG:
“PM:我们需要在这个用户通话开始10分钟后给予提醒给他们发送奖励”
对其特定动作,比如通知、发券等等。一般我接触到的解决方法中在比较小的服务里都会自己维护一个backend,但是随着这种backend和server增多,这种方法很大程度和本身业务耦合在一起,所以这时需要一个延时队列服务。
名词解释
topic_list队列:每一个来的延时请求都应该又一个延时主题参考kafka,在逻辑上划分出一个队列出来每个业务分开处理;
topic_info队列:每一个队列topic都存在一个新的队列里,每次扫描topic信息检测新的topic建立与销毁管理服务协程数量;
offset:当前消费的进度;
new_offset:新消费的进度,预备更迭offset;
topic_offset_lock:分布式锁。
二、设计目标
功能清单
1、延时信息添加接口基于http调用
2、拥有存储队列特性,可保存近3天内的队列消费数据
3、提供消费功能
4、延时通知
性能指标
预计接口的调用量:单秒单类任务数3500,多秒单类任务数1300
压测结果:
简单压测
wrk写入qps:259.3s 写入9000条记录 单线程 无并发
触发性能/准确率:单秒1000,在测试机无延长。单秒3000时,偶尔出现1-2秒延迟。受内存和cpu影响。
三、系统设计
交互流程
时序图
本设计基于http接口调用,当向topic存在的队列中添加消息的时候,消息会被添加到相应topic队列的末尾储存,当添加到不存在的相应topic队列时,首先建立新topic队列,当定时器触发的时候或者分布式锁,抢到锁的实例先获得相应队列的offset,设置新offset,就可以释放锁了让给其他实例争抢,弹出队列头一定数量元素,然后拿到offset段的实例去存储中拿详细信息,在协程中处理,主要协程等待下次触发。然后添加协程去监控触发。
模块划分
1、队列存储模块
1·delay下的delay.base模块,主要负责接收写请求,将队列信息写入存储,不负责backend逻辑,调用存储模块
2、backend模块。delay下的delay.backend模块,负责时间触发扫描对应的topic队列,调用存储模块,主要负责访问读取存储模块,调用callback模块
1·扫描topic添加groutine
2·扫描topic_list消费信息
3·扫描topic_list如果一定时间没有消费到则关闭groutine
3、callback模块,主要负责发送已经到时间的数据,向相应服务通知
3、存储模块
1·分布式锁模块,系统多机部署,保证每次消费的唯一性,对每次topic消费的offset段进行上锁offset到new_offset段单机独享
2·topic管理列表,管理topic数量控制协程数
3·topic_list,消息队列
4·topic_info,消息实体,可能需要回调中会携带一些信息统一处理
4、唯一号生成模块。
五、缓存设计
目前使用全缓存模式
key设计:
topic管理list key: XX:DELAY_TOPIC_LIST type:list
topic_list key: XX:DELAY_SIMPLE_TOPIC_TASK-%s(根据topic分key) type:zset
topic_info key: XX:DELAY_REALL_TOPIC_TASK-%s(根据topic分key) type:hash
topic_offset key: XX:DELAY_TOPIC_OFFSET-%s(根据topic分key) type:string
topic_lock key: xx:DELAY_TOPIC_RELOAD_LOCK-%s(根据topic分key) type:string
六、接口设计
delay.task.addv1 (延时队列添加v1)
请求示例
curl -d '{ "topic": "xxx", // 业务topic "timing_moment": , // 单位秒,要定时时刻 "content": "{}" // 消息体,json串 }' 'http://127.0.0.1:xxxx/delay/task/add'
返回示例
{ "dm_error": 0, "error_msg": "操作成功", "task_id":112345465765 }
pull回调方式返回(v2不再支持)
请求示例
curl -d '{ "topic": "xxxx", // 业务topic "task_id":1324568798765 // taskid,选填,有则返回特定消息 }' 'http://127.0.0.1:xxxx/delay/task/pull'
返回示例
{ "dm_error": 0, "error_msg": "操作成功" "content":"{"\xxx"\}" }
delay.task.addv2 (延时队列添加v2)
请求示例
curl -d '{ "topic": "xxx", // 业务topic "timing_moment": , // 单位秒,要定时时刻 "content": "{ // 消息内容(json string) "sn":"message.call", // 服务发现名字(或为配置服务名) "url":"/ev/tp/xxxx", // 回调url "xxx":"xxx" // 其他字段 }" }' 'http://127.0.0.1:xxxx/delay/task/add'
示例
curl -d '{ "topic":"xxxx_push", "content":"{ "uid":"111111", "sn":"other.server", "url":"/xxxx/callback", "msg_type":"gift", }", "timing_moment":1565700615 }' http://127.0.0.1:xxxx/delay/task/add
返回示例
{ "dm_error": 0, "error_msg": "操作成功", "task_id":112345465765 }
七、MQ设计(v2不再支持)
关于kafka消费方式返回:
topic: delay_base_push 固定返回格式 { "topic": "xxxx", // 业务topic "content": "{}" // 单条生产消息content }
八、其他设计
唯一号设计
调用存储模块,利用redis的自增结合逻辑生成唯一号具体逻辑如下:
func (c *CacheManager) OperGenTaskid() (uint64, error) { now := time.Now().Unix() key := c.getDelayTaskIdKey() reply, err := c.DelayRds.Do("INCR", key) if err != nil { log.Errorf("genTaskid INCR key:%s, error:%s", key, err) return 0, err } version := reply.(int64) if version == 1 { //默认认为1秒能创建100个任务 c.DelayRds.Expire(key, time.Duration(100)*time.Second) } incrNum := version % 10000 taskId := (uint64(now)*10000 + uint64(incrNum)) log.Debugf("genTaskid INCR key:%s, taskId:%d", key, taskId) return taskId, nil }
分布式锁设计
func (c *CacheManager) SetDelayTopicLock(ctx context.Context, topic string) (bool, error) { key := c.getDelayTopicReloadLockKey(topic) reply, err := c.DelayRds.Do("SET", key, "lock", "NX", "EX", 2) if err != nil { log.Errorf("SetDelayTopicLock SETNX key:%s, cal:%v, error:%s", key, "lock", err) return false, err } if reply == nil { return false, nil } log.Debugf("SetDelayTopicLock SETNXEX topic:%s lock:%d", topic, false) return true, nil }
九、设计考虑
健壮性
熔断策略:
这版设计中有很多不足之处,当redis不可访问时,请求将大量积压给机器或者实例带来压力,导致其他服务不可用,所以采取降级策略(降级策略也有不足);在请求redis时加入重试,当重试次数多于报警次数,会记录一个原子操作atomic.StoreInt32(&stopFlag,1),其中stopFlag为一个全局的变量,在atomic.LoadInt32(&stopFlag)后,stopFlag的值为1则暂时不请求redis,同时记录当前时间,加入定时器,熔断器分为三个级别,开,关,半开,当定时器结束后stopFlag=2第二个定时将为半开状态计时,有概率访问redis,当成功次数到达阈值stopFlag=0,否则stopFlag=1继续计时
不足
1、调用time定时
通常golang 写循环执行的定时任务大概用三种实现方式:
1、time.Sleep方法:
for { time.Sleep(time.Second) fmt.Println("test") }
2、time.Tick函数:
t1:=time.Tick(3*time.Second) for { select { case <p>3、其中Tick定时任务,也可以先使用time.Ticker函数获取Ticker结构体,然后进行阻塞监听信息,这种方式可以手动选择停止定时任务,在停止任务时,减少对内存的浪费。</p> <pre class="brush:plain;"> t:=time.NewTicker(time.Second) for { select { case <p>在最开始以为sleep是单独处理直接停掉了这个协程,所以第一版用的也是sleep,但是在收集资料后发现这几种方式都创建了timer,并加入了定时任务处理协程。实际上这两个函数产生的timer都放入了同一个timer堆(golang时间轮),都在定时任务处理协程中等待被处理。Tick,Sleep,time.After函数都使用的timer结构体,都会被放在同一个协程中统一处理,这样看起来使用Tick,Sleep并没有什么区别。实际上是有区别的,本文不是讨论golang定时执行任务time.sleep和time.tick的优劣,以后会在后续文章进行探讨。使用channel阻塞协程完成定时任务比较灵活,可以结合select设置超时时间以及默认执行方法,而且可以设置timer的主动关闭,所以,建议使用time.Tick完成定时任务。</p> <p><strong>2、存储模块问题</strong></p> <p>目前是全缓存,没有DB参与,首先redis(codis)的高可用是个问题,在熔断之后采取“不作为”的判断也是有问题的,所以对未来展望,首先是:</p> <p>1·单机的数据结构使用多时间轮。为了减少数据的路程,将load数据的过程异步加载到机器,减少网络io所造成的时间损耗。同时也是减少对redis的依赖</p> <p>2·引入ZooKeeper或者添加集群备份,leader。保证集群中至少有两台机器load一个topic的数据,leader可以协调消费保证高可用</p> <p>今天关于《基于golang的简单分布式延时队列服务的实现》的内容介绍就到此结束,如果有什么疑问或者建议,可以在golang学习网公众号下多多回复交流;文中若有不正之处,也希望回复留言以告知!</p>

- 上一篇
- Go语言正则表达式的使用详解

- 下一篇
- golang 微服务之gRPC与Protobuf的使用
-
- 内向的板栗
- 真优秀,一直没懂这个问题,但其实工作中常常有遇到...不过今天到这,看完之后很有帮助,总算是懂了,感谢大佬分享博文!
- 2023-02-01 04:40:52
-
- 合适的红牛
- 这篇博文出现的刚刚好,太全面了,赞 ??,mark,关注作者了!希望作者能多写Golang相关的文章。
- 2023-01-21 14:20:11
-
- 悲凉的歌曲
- 这篇技术文章太及时了,太细致了,赞 ??,码起来,关注大佬了!希望大佬能多写Golang相关的文章。
- 2023-01-09 20:26:38
-
- 霸气的毛豆
- 这篇博文真及时,老哥加油!
- 2023-01-09 09:34:47
-
- 舒服的水杯
- 赞 ??,一直没懂这个问题,但其实工作中常常有遇到...不过今天到这,看完之后很有帮助,总算是懂了,感谢up主分享文章内容!
- 2023-01-08 08:34:43
-
- 无聊的牛排
- 很详细,收藏了,感谢老哥的这篇博文,我会继续支持!
- 2023-01-07 18:47:58
-
- 整齐的楼房
- 细节满满,mark,感谢作者大大的这篇文章,我会继续支持!
- 2023-01-01 01:13:30
-
- Golang · Go教程 | 30秒前 |
- GolangUDP可靠传输:序列号与ACK机制解析
- 216浏览 收藏
-
- Golang · Go教程 | 30秒前 |
- Golangdefer指针陷阱与延迟执行详解
- 474浏览 收藏
-
- Golang · Go教程 | 2分钟前 |
- Golang代理模式优化:接口缓存实现技巧
- 334浏览 收藏
-
- Golang · Go教程 | 6分钟前 |
- 结构体嵌入与方法继承,Go语言类型复用技巧
- 392浏览 收藏
-
- Golang · Go教程 | 6分钟前 |
- Golang配置HTTPS与Let'sEncrypt证书教程
- 181浏览 收藏
-
- Golang · Go教程 | 9分钟前 |
- 结构体嵌入与方法继承详解
- 278浏览 收藏
-
- Golang · Go教程 | 14分钟前 |
- Golang反射使用风险与性能分析
- 207浏览 收藏
-
- Golang · Go教程 | 16分钟前 |
- Golang信号量控制goroutine数量方法
- 133浏览 收藏
-
- Golang · Go教程 | 18分钟前 |
- Golang并发错误处理:goroutine错误传递解析
- 360浏览 收藏
-
- Golang · Go教程 | 21分钟前 |
- Golang高效读取大文件,bufio.Scanner分块解析
- 316浏览 收藏
-
- Golang · Go教程 | 23分钟前 |
- Golang扇入扇出原理与多路复用演示
- 282浏览 收藏
-
- Golang · Go教程 | 25分钟前 |
- Golang内存优化:降低GC压力技巧
- 266浏览 收藏
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 542次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 511次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 498次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 484次学习
-
- 千音漫语
- 千音漫语,北京熠声科技倾力打造的智能声音创作助手,提供AI配音、音视频翻译、语音识别、声音克隆等强大功能,助力有声书制作、视频创作、教育培训等领域,官网:https://qianyin123.com
- 96次使用
-
- MiniWork
- MiniWork是一款智能高效的AI工具平台,专为提升工作与学习效率而设计。整合文本处理、图像生成、营销策划及运营管理等多元AI工具,提供精准智能解决方案,让复杂工作简单高效。
- 89次使用
-
- NoCode
- NoCode (nocode.cn)是领先的无代码开发平台,通过拖放、AI对话等简单操作,助您快速创建各类应用、网站与管理系统。无需编程知识,轻松实现个人生活、商业经营、企业管理多场景需求,大幅降低开发门槛,高效低成本。
- 107次使用
-
- 达医智影
- 达医智影,阿里巴巴达摩院医疗AI创新力作。全球率先利用平扫CT实现“一扫多筛”,仅一次CT扫描即可高效识别多种癌症、急症及慢病,为疾病早期发现提供智能、精准的AI影像早筛解决方案。
- 98次使用
-
- 智慧芽Eureka
- 智慧芽Eureka,专为技术创新打造的AI Agent平台。深度理解专利、研发、生物医药、材料、科创等复杂场景,通过专家级AI Agent精准执行任务,智能化工作流解放70%生产力,让您专注核心创新。
- 98次使用
-
- Go语言实战之实现一个简单分布式系统
- 2022-12-23 220浏览
-
- Go与Redis实现分布式互斥锁和红锁
- 2022-12-22 117浏览
-
- golang 基于 mysql 简单实现分布式读写锁
- 2023-01-07 384浏览
-
- Golang分布式应用之Redis示例详解
- 2023-01-07 113浏览
-
- Golang分布式应用定时任务示例详解
- 2022-12-24 269浏览