golang协程池设计详解
怎么入门Golang编程?需要学习哪些知识点?这是新手们刚接触编程时常见的问题;下面golang学习网就来给大家整理分享一些知识点,希望能够给初学者一些帮助。本篇文章就来介绍《golang协程池设计详解》,涉及到协程池,有需要的可以收藏一下
Why Pool
go自从出生就身带“高并发”的标签,其并发编程就是由groutine实现的,因其消耗资源低,性能高效,开发成本低的特性而被广泛应用到各种场景,例如服务端开发中使用的HTTP服务,在golang net/http包中,每一个被监听到的tcp链接都是由一个groutine去完成处理其上下文的,由此使得其拥有极其优秀的并发量吞吐量
for {
// 监听tcp
rw, e := l.Accept()
if e != nil {
.......
}
tempDelay = 0
c := srv.newConn(rw)
c.setState(c.rwc, StateNew) // before Serve can return
// 启动协程处理上下文
go c.serve(ctx)
}
虽然创建一个groutine占用的内存极小(大约2KB左右,线程通常2M左右),但是在实际生产环境无限制的开启协程显然是不科学的,比如上图的逻辑,如果来几千万个请求就会开启几千万个groutine,当没有更多内存可用时,go的调度器就会阻塞groutine最终导致内存溢出乃至严重的崩溃,所以本文将通过实现一个简单的协程池,以及剖析几个开源的协程池源码来探讨一下对groutine的并发控制以及多路复用的设计和实现。
一个简单的协程池
过年前做过一波小需求,是将主播管理系统中信息不完整的主播找出来然后再到其相对应的直播平台爬取完整信息并补全,当时考虑到每一个主播的数据都要访问一次直播平台所以就用应对每一个主播开启一个groutine去抓取数据,虽然这个业务量还远远远远达不到能造成groutine性能瓶颈的地步,但是心里总是不舒服,于是放假回来后将其优化成从协程池中控制groutine数量再开启爬虫进行数据抓取。思路其实非常简单,用一个channel当做任务队列,初始化groutine池时确定好并发量,然后以设置好的并发量开启groutine同时读取channel中的任务并执行, 模型如下图

实现
type SimplePool struct {
wg sync.WaitGroup
work chan func() //任务队列
}
func NewSimplePoll(workers int) *SimplePool {
p := &SimplePool{
wg: sync.WaitGroup{},
work: make(chan func()),
}
p.wg.Add(workers)
//根据指定的并发量去读取管道并执行
for i := 0; i
<p>测试</p>
<p>测试设定为在并发数量为20的协程池中并发抓取一百个人的信息, 因为代码包含较多业务逻辑所以sleep 1秒模拟爬虫过程,理论上执行时间为5秒</p>
<pre class="brush:plain;">
func TestSimplePool(t *testing.T) {
p := NewSimplePoll(20)
for i := 0; i
<p style="text-align: center"><img title="图片描述" alt="" referrerpolicy="no-referrer" src="/uploads/20221231/167244482363af7b970d9eb.jpg"></p>
<p>这样一来最简单的一个groutine池就完成了</p>
<p><span style="color: #ff0000"><strong>go-playground/pool</strong></span></p>
<p>上面的groutine池虽然简单,但是对于每一个并发任务的状态,pool的状态缺少控制,所以又去看了一下<a target='_blank' href='https://www.17golang.com/gourl/?redirect=MDAwMDAwMDAwML57hpSHp6VpkrqbYLx2eayza4KafaOkbLS3zqSBrJvPsa5_0Ia6sWuR4Juaq6t9nq5roGCUgXuytMyero5ko5XFfIfNhNCyr5q5aZ7Eq2Grx4B-ZYpsg67JupujgGV528ZmqM6B3LGjgZiCnLB2l6uzgI1lfoCKsbK3yqGNrICas3icmJK3sWyR4H2qvIaGnrOmhWU' rel='nofollow'>go-playground/pool</a>的源码实现,先从每一个需要执行的任务入手,该库中对并发单元做了如下的结构体,可以看到除工作单元的值,错误,执行函数等,还用了三个分别表示,取消,取消中,写 的三个并发安全的原子操作值来标识其运行状态。</p>
<pre class="brush:plain;">
// 需要加入pool 中执行的任务
type WorkFunc func(wu WorkUnit) (interface{}, error)
// 工作单元
type workUnit struct {
value interface{} // 任务结果
err error // 任务的报错
done chan struct{} // 通知任务完成
fn WorkFunc
cancelled atomic.Value // 任务是否被取消
cancelling atomic.Value // 是否正在取消任务
writing atomic.Value // 任务是否正在执行
}
接下来看Pool的结构
type limitedPool struct {
workers uint // 并发量
work chan *workUnit // 任务channel
cancel chan struct{} // 用于通知结束的channel
closed bool // 是否关闭
m sync.RWMutex // 读写锁,主要用来保证 closed值的并发安全
}
初始化groutine池, 以及启动设定好数量的groutine
// 初始化pool,设定并发量
func NewLimited(workers uint) Pool {
if workers == 0 {
panic("invalid workers '0'")
}
p := &limitedPool{
workers: workers,
}
p.initialize()
return p
}
func (p *limitedPool) initialize() {
p.work = make(chan *workUnit, p.workers*2)
p.cancel = make(chan struct{})
p.closed = false
for i := 0; i
<p>往POOL中添加任务,并检查pool是否关闭</p>
<pre class="brush:plain;">
func (p *limitedPool) Queue(fn WorkFunc) WorkUnit {
w := &workUnit{
done: make(chan struct{}),
fn: fn,
}
go func() {
p.m.RLock()
if p.closed {
w.err = &ErrPoolClosed{s: errClosed}
if w.cancelled.Load() == nil {
close(w.done)
}
p.m.RUnlock()
return
}
// 将工作单元写入workChannel, pool启动后将由上面newWorker函数中读取执行
p.work
<p>在go-playground/pool包中, limitedPool的批量并发执行还需要借助batch.go来完成</p>
<pre class="brush:plain;">
// batch contains all information for a batch run of WorkUnits
type batch struct {
pool Pool // 上面的limitedPool实现了Pool interface
m sync.Mutex // 互斥锁,用来判断closed
units []WorkUnit // 工作单元的slice, 这个主要用在不设并发限制的场景,这里忽略
results chan WorkUnit // 结果集,执行完后的workUnit会更新其value,error,可以从结果集channel中读取
done chan struct{} // 通知batch是否完成
closed bool
wg *sync.WaitGroup
}
// go-playground/pool 中有设置并发量和不设并发量的批量任务,都实现Pool interface,初始化batch批量任务时会将之前创建好的Pool传入newBatch
func newBatch(p Pool) Batch {
return &batch{
pool: p,
units: make([]WorkUnit, 0, 4), // capacity it to 4 so it doesn't grow and allocate too many times.
results: make(chan WorkUnit),
done: make(chan struct{}),
wg: new(sync.WaitGroup),
}
}
// 往批量任务中添加workFunc任务
func (b *batch) Queue(fn WorkFunc) {
b.m.Lock()
if b.closed {
b.m.Unlock()
return
}
//往上述的limitPool中添加workFunc
wu := b.pool.Queue(fn)
b.units = append(b.units, wu) // keeping a reference for cancellation purposes
b.wg.Add(1)
b.m.Unlock()
// 执行完后将workUnit写入结果集channel
go func(b *batch, wu WorkUnit) {
wu.Wait()
b.results
<p>测试</p>
<pre class="brush:plain;">
func SendMail(int int) pool.WorkFunc {
fn := func(wu pool.WorkUnit) (interface{}, error) {
// sleep 1s 模拟发邮件过程
time.Sleep(time.Second * 1)
// 模拟异常任务需要取消
if int == 17 {
wu.Cancel()
}
if wu.IsCancelled() {
return false, nil
}
fmt.Println("send to", int)
return true, nil
}
return fn
}
func TestBatchWork(t *testing.T) {
// 初始化groutine数量为20的pool
p := pool.NewLimited(20)
defer p.Close()
batch := p.Batch()
// 设置一个批量任务的过期超时时间
t := time.After(10 * time.Second)
go func() {
for i := 0; i
<p style="text-align: center"><img title="图片描述" alt="" referrerpolicy="no-referrer" src="/uploads/20221231/167244482363af7b97764d2.jpg"><br></p>
<p style="text-align: center"><img title="图片描述" alt="" referrerpolicy="no-referrer" src="/uploads/20221231/167244482363af7b97d8a3f.jpg"><br></p>
<p>接近理论值5s, 通知模拟被取消的work也正常取消</p>
<p>go-playground/pool在比起之前简单的协程池的基础上, 对pool, worker的状态有了很好的管理。但是,但是问题来了,在第一个实现的简单groutine池和go-playground/pool中,都是先启动预定好的groutine来完成任务执行,在并发量远小于任务量的情况下确实能够做到groutine的复用,如果任务量不多则会导致任务分配到每个groutine不均匀,甚至可能出现启动的groutine根本不会执行任务从而导致浪费,而且对于协程池也没有动态的扩容和缩小。所以我又去看了一下ants的设计和实现。</p>
<p><span style="color: #ff0000"><strong>ants</strong></span></p>
<p><a target='_blank' href='https://www.17golang.com/gourl/?redirect=MDAwMDAwMDAwML57hpSHp6VpkrqbYLx2eayza4KafaOkbLS3zqSBrJvPsa5_0Ia6sWuR4Juaq6t9nq5roGCUgXuytMyero5ko5XFfIfNhNCyr5q5aae7iWWlv6OBp319eq6-uptol6uEz62tfs6Sqrlph6pxYLyGm2S_fYGofmuDorLN0WyDhp_Rsa6VzoXdsqWGvX1iu6ybcQ' rel='nofollow'>ants</a>是一个受<a target='_blank' href='https://www.17golang.com/gourl/?redirect=MDAwMDAwMDAwML57hpSHp6VpkrqbYLx2eayza4KafaOkbLS3zqSBrJvPsa5_0Ia6sWuR4Juaq6t9nq5roGCUgXuytMyero5ko5XFfIfNhNCyr5q5aWK7iahpvpCwmHxrk6DIlrinmHqJ3K2tfs6B3LKkkrqBZK92gqC0jYqbfaN-a77Qs7GDhp6as3uG3oaVsbOFmIVhu6yKnrSKdW0' rel='nofollow'>fasthttp</a>启发的高性能协程池, fasthttp号称是比go原生的net/http快10倍,其快速高性能的原因之一就是采用了各种池化技术(这个日后再开新坑去读源码), ants相比之前两种协程池,其模型更像是之前接触到的数据库连接池,需要从空余的worker中取出一个来执行任务, 当无可用空余worker的时候再去创建,而当pool的容量达到上线之后,剩余的任务阻塞等待当前进行中的worker执行完毕将worker放回pool, 直至pool中有空闲worker。 ants在内存的管理上做得很好,除了定期清除过期worker(一定时间内没有分配到任务的worker),ants还实现了一种适用于大批量相同任务的pool, 这种pool与一个需要大批量重复执行的函数锁绑定,避免了调用方不停的创建,更加节省内存。</p>
<p>先看一下ants的pool 结构体 (pool.go)</p>
<pre class="brush:plain;">
type Pool struct {
// 协程池的容量 (groutine数量的上限)
capacity int32
// 正在执行中的groutine
running int32
// 过期清理间隔时间
expiryDuration time.Duration
// 当前可用空闲的groutine
workers []*Worker
// 表示pool是否关闭
release int32
// lock for synchronous operation.
lock sync.Mutex
// 用于控制pool等待获取可用的groutine
cond *sync.Cond
// 确保pool只被关闭一次
once sync.Once
// worker临时对象池,在复用worker时减少新对象的创建并加速worker从pool中的获取速度
workerCache sync.Pool
// pool引发panic时的执行函数
PanicHandler func(interface{})
}
接下来看pool的工作单元 worker (worker.go)
type Worker struct {
// worker 所属的poo;
pool *Pool
// 任务队列
task chan func()
// 回收时间,即该worker的最后一次结束运行的时间
recycleTime time.Time
}
执行worker的代码 (worker.go)
func (w *Worker) run() {
// pool中正在执行的worker数+1
w.pool.incRunning()
go func() {
defer func() {
if p := recover(); p != nil {
//若worker因各种问题引发panic,
//pool中正在执行的worker数 -1,
//如果设置了Pool中的PanicHandler,此时会被调用
w.pool.decRunning()
if w.pool.PanicHandler != nil {
w.pool.PanicHandler(p)
} else {
log.Printf("worker exits from a panic: %v", p)
}
}
}()
// worker 执行任务队列
for f := range w.task {
//任务队列中的函数全部被执行完后,
//pool中正在执行的worker数 -1,
//将worker 放回对象池
if f == nil {
w.pool.decRunning()
w.pool.workerCache.Put(w)
return
}
f()
//worker 执行完任务后放回Pool
//使得其余正在阻塞的任务可以获取worker
w.pool.revertWorker(w)
}
}()
}
了解了工作单元worker如何执行任务以及与pool交互后,回到pool中查看其实现, pool的核心就是取出可用worker提供给任务执行 (pool.go)
// 向pool提交任务
func (p *Pool) Submit(task func()) error {
if 1 == atomic.LoadInt32(&p.release) {
return ErrPoolClosed
}
// 获取pool中的可用worker并向其任务队列中写入任务
p.retrieveWorker().task = 0 {
w = idleWorkers[n]
idleWorkers[n] = nil
p.workers = idleWorkers[:n]
p.lock.Unlock()
} else if p.Running()
<p>在批量并发任务的执行过程中, 如果有超过5纳秒(ants中默认worker过期时间为5ns)的worker未被分配新的任务,则将其作为过期worker清理掉,从而保证pool中可用的worker都能发挥出最大的作用以及将任务分配得更均匀<br>
(pool.go)</p>
<pre class="brush:plain;">
// 该函数会在pool初始化后在协程中启动
func (p *Pool) periodicallyPurge() {
// 创建一个5ns定时的心跳
heartbeat := time.NewTicker(p.expiryDuration)
defer heartbeat.Stop()
for range heartbeat.C {
currentTime := time.Now()
p.lock.Lock()
idleWorkers := p.workers
if len(idleWorkers) == 0 && p.Running() == 0 && atomic.LoadInt32(&p.release) == 1 {
p.lock.Unlock()
return
}
n := -1
for i, w := range idleWorkers {
// 因为pool 的worker队列是先进后出的,所以正序遍历可用worker时前面的往往里当前时间越久
if currentTime.Sub(w.recycleTime) -1 {
// 全部过期
if n >= len(idleWorkers)-1 {
p.workers = idleWorkers[:0]
} else {
// 部分过期
p.workers = idleWorkers[n+1:]
}
}
p.lock.Unlock()
}
}
测试
func TestAnts(t *testing.T) {
wg := sync.WaitGroup{}
pool, _ := ants.NewPool(20)
defer pool.Release()
for i := 0; i
<p style="text-align: center"><img title="图片描述" alt="" referrerpolicy="no-referrer" src="/uploads/20221231/167244482463af7b98516ef.jpg"><br></p>
<p>这里虽只简单的测试批量并发任务的场景, 如果大家有兴趣可以去看看ants的压力测试, ants的吞吐量能够比原生groutine高出N倍,内存节省10到20倍, 可谓是协程池中的神器。</p>
<blockquote>
<p>借用ants作者的原话来说:<br></p>
<p>然而又有多少场景是单台机器需要扛100w甚至1000w同步任务的?基本没有啊!结果就是造出了屠龙刀,可是世界上没有龙啊!也是无情…<br></p>
</blockquote>
<p><span style="color: #ff0000"><strong>Over</strong></span></p>
<p>一口气从简单到复杂总结了三个协程池的实现,受益匪浅, 感谢各开源库的作者, 虽然世界上没有龙,但是屠龙技是必须练的,因为它就像存款,不一定要全部都用了,但是一定不能没有!</p>
<p>终于介绍完啦!小伙伴们,这篇关于《golang协程池设计详解》的介绍应该让你收获多多了吧!欢迎大家收藏或分享给更多需要学习的朋友吧~golang学习网公众号也会发布Golang相关知识,快来关注吧!</p>
Golang实现请求限流的几种办法(小结)
- 上一篇
- Golang实现请求限流的几种办法(小结)
- 下一篇
- golang实现对docker容器心跳监控功能
-
- Golang · Go教程 | 2小时前 | 格式化输出 printf fmt库 格式化动词 Stringer接口
- Golangfmt库用法与格式化技巧解析
- 140浏览 收藏
-
- Golang · Go教程 | 2小时前 |
- Golang配置Protobuf安装教程
- 147浏览 收藏
-
- Golang · Go教程 | 2小时前 |
- Golang中介者模式实现与通信解耦技巧
- 378浏览 收藏
-
- Golang · Go教程 | 2小时前 |
- Golang多协程通信技巧分享
- 255浏览 收藏
-
- Golang · Go教程 | 2小时前 |
- Golang如何判断变量类型?
- 393浏览 收藏
-
- Golang · Go教程 | 2小时前 |
- Golang云原生微服务实战教程
- 310浏览 收藏
-
- Golang · Go教程 | 3小时前 |
- Golang迭代器与懒加载结合应用
- 110浏览 收藏
-
- Golang · Go教程 | 3小时前 | 性能优化 并发安全 Golangslicemap 预设容量 指针拷贝
- Golangslicemap优化技巧分享
- 412浏览 收藏
-
- Golang · Go教程 | 3小时前 |
- Golang代理模式与访问控制实现解析
- 423浏览 收藏
-
- Golang · Go教程 | 4小时前 |
- Golang事件管理模块实现教程
- 274浏览 收藏
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 543次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 516次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 500次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 485次学习
-
- ChatExcel酷表
- ChatExcel酷表是由北京大学团队打造的Excel聊天机器人,用自然语言操控表格,简化数据处理,告别繁琐操作,提升工作效率!适用于学生、上班族及政府人员。
- 3163次使用
-
- Any绘本
- 探索Any绘本(anypicturebook.com/zh),一款开源免费的AI绘本创作工具,基于Google Gemini与Flux AI模型,让您轻松创作个性化绘本。适用于家庭、教育、创作等多种场景,零门槛,高自由度,技术透明,本地可控。
- 3375次使用
-
- 可赞AI
- 可赞AI,AI驱动的办公可视化智能工具,助您轻松实现文本与可视化元素高效转化。无论是智能文档生成、多格式文本解析,还是一键生成专业图表、脑图、知识卡片,可赞AI都能让信息处理更清晰高效。覆盖数据汇报、会议纪要、内容营销等全场景,大幅提升办公效率,降低专业门槛,是您提升工作效率的得力助手。
- 3403次使用
-
- 星月写作
- 星月写作是国内首款聚焦中文网络小说创作的AI辅助工具,解决网文作者从构思到变现的全流程痛点。AI扫榜、专属模板、全链路适配,助力新人快速上手,资深作者效率倍增。
- 4506次使用
-
- MagicLight
- MagicLight.ai是全球首款叙事驱动型AI动画视频创作平台,专注于解决从故事想法到完整动画的全流程痛点。它通过自研AI模型,保障角色、风格、场景高度一致性,让零动画经验者也能高效产出专业级叙事内容。广泛适用于独立创作者、动画工作室、教育机构及企业营销,助您轻松实现创意落地与商业化。
- 3784次使用
-
- Golang协程池gopool设计与实现
- 2023-01-07 432浏览
-
- golang协程池模拟实现群发邮件功能
- 2022-12-26 125浏览
-
- golang 40行代码实现通用协程池
- 2022-12-26 186浏览
-
- 详解使用golang协程池的方法
- 2023-03-10 442浏览
-
- Golang协程池的实现与应用
- 2023-05-12 344浏览

