一文读懂go中semaphore(信号量)源码
IT行业相对于一般传统行业,发展更新速度更快,一旦停止了学习,很快就会被行业所淘汰。所以我们需要踏踏实实的不断学习,精进自己的技术,尤其是初学者。今天golang学习网给大家整理了《一文读懂go中semaphore(信号量)源码》,聊聊源码、gosemaphore,我们一起来看看吧!
运行时信号量机制 semaphore
前言
最近在看源码,发现好多地方用到了这个semaphore。
本文是在go version go1.13.15 darwin/amd64上进行的
作用是什么
下面是官方的描述
// Semaphore implementation exposed to Go. // Intended use is provide a sleep and wakeup // primitive that can be used in the contended case // of other synchronization primitives. // Thus it targets the same goal as Linux's futex, // but it has much simpler semantics. // // That is, don't think of these as semaphores. // Think of them as a way to implement sleep and wakeup // such that every sleep is paired with a single wakeup, // even if, due to races, the wakeup happens before the sleep. // 具体的用法是提供 sleep 和 wakeup 原语 // 以使其能够在其它同步原语中的竞争情况下使用 // 因此这里的 semaphore 和 Linux 中的 futex 目标是一致的 // 只不过语义上更简单一些 // // 也就是说,不要认为这些是信号量 // 把这里的东西看作 sleep 和 wakeup 实现的一种方式 // 每一个 sleep 都会和一个 wakeup 配对 // 即使在发生 race 时,wakeup 在 sleep 之前时也是如此
上面提到了和futex作用一样,关于futex
futex(快速用户区互斥的简称)是一个在Linux上实现锁定和构建高级抽象锁如信号量和POSIX互斥的基本工具
Futex 由一块能够被多个进程共享的内存空间(一个对齐后的整型变量)组成;这个整型变量的值能够通过汇编语言调用CPU提供的原子操作指令来增加或减少,并且一个进程可以等待直到那个值变成正数。Futex 的操作几乎全部在用户空间完成;只有当操作结果不一致从而需要仲裁时,才需要进入操作系统内核空间执行。这种机制允许使用 futex 的锁定原语有非常高的执行效率:由于绝大多数的操作并不需要在多个进程之间进行仲裁,所以绝大多数操作都可以在应用程序空间执行,而不需要使用(相对高代价的)内核系统调用。
go中的semaphore作用和futex目标一样,提供sleep和wakeup原语,使其能够在其它同步原语中的竞争情况下使用。当一个goroutine需要休眠时,将其进行集中存放,当需要wakeup时,再将其取出,重新放入调度器中。
例如在读写锁的实现中,读锁和写锁之前的相互阻塞唤醒,就是通过sleep和wakeup实现,当有读锁存在的时候,新加入的写锁通过semaphore阻塞自己,当前面的读锁完成,在通过semaphore唤醒被阻塞的写锁。
写锁
// 获取互斥锁
// 阻塞等待所有读操作结束(如果有的话)
func (rw *RWMutex) Lock() {
...
// 原子的修改readerCount的值,直接将readerCount减去rwmutexMaxReaders
// 说明,有写锁进来了,这在上面的读锁中也有体现
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// 当r不为0说明,当前写锁之前有读锁的存在
// 修改下readerWait,也就是当前写锁需要等待的读锁的个数
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
// 阻塞当前写锁
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
...
}
通过runtime_SemacquireMutex对当前写锁进行sleep
读锁释放
// 减少读操作计数,即readerCount--
// 唤醒等待写操作的协程(如果有的话)
func (rw *RWMutex) RUnlock() {
...
// 首先通过atomic的原子性使readerCount-1
// 1.若readerCount大于0, 证明当前还有读锁, 直接结束本次操作
// 2.若readerCount小于0, 证明已经没有读锁, 但是还有因为读锁被阻塞的写锁存在
if r := atomic.AddInt32(&rw.readerCount, -1); r
<p>写锁处理完之后,调用<code>runtime_Semrelease</code>来唤醒<code>sleep</code>的写锁</p>
<h3>几个主要的方法</h3>
<p>在<code>go/src/sync/runtime.go</code>中,定义了这几个方法</p>
<pre class="brush:plain;">
// Semacquire等待*s > 0,然后原子递减它。
// 它是一个简单的睡眠原语,用于同步
// library and不应该直接使用。
func runtime_Semacquire(s *uint32)
// SemacquireMutex类似于Semacquire,用来阻塞互斥的对象
// 如果lifo为true,waiter将会被插入到队列的头部
// skipframes是跟踪过程中要省略的帧数,从这里开始计算
// runtime_SemacquireMutex's caller.
func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)
// Semrelease会自动增加*s并通知一个被Semacquire阻塞的等待的goroutine
// 它是一个简单的唤醒原语,用于同步
// library and不应该直接使用。
// 如果handoff为true, 传递信号到队列头部的waiter
// skipframes是跟踪过程中要省略的帧数,从这里开始计算
// runtime_Semrelease's caller.
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
具体的实现是在go/src/runtime/sema.go中
//go:linkname sync_runtime_Semacquire sync.runtime_Semacquire
func sync_runtime_Semacquire(addr *uint32) {
semacquire1(addr, false, semaBlockProfile, 0)
}
//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) {
semrelease1(addr, handoff, skipframes)
}
//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {
semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes)
}
如何实现
sudog 缓存
semaphore的实现使用到了sudog,我们先来看下
sudog 是运行时用来存放处于阻塞状态的goroutine的一个上层抽象,是用来实现用户态信号量的主要机制之一。 例如当一个goroutine因为等待channel的数据需要进行阻塞时,sudog会将goroutine及其用于等待数据的位置进行记录, 并进而串联成一个等待队列,或二叉平衡树。
// sudogs are allocated from a special pool. Use acquireSudog and
// releaseSudog to allocate and free them.
type sudog struct {
// 以下字段受hchan保护
g *g
// isSelect 表示 g 正在参与一个 select, so
// 因此 g.selectDone 必须以 CAS 的方式来获取wake-up race.
isSelect bool
next *sudog
prev *sudog
elem unsafe.Pointer // 数据元素(可能指向栈)
// 以下字段不会并发访问。
// 对于通道,waitlink只被g访问。
// 对于信号量,所有字段(包括上面的字段)
// 只有当持有一个semroot锁时才被访问。
acquiretime int64
releasetime int64
ticket uint32
parent *sudog //semaRoot 二叉树
waitlink *sudog // g.waiting 列表或 semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}
sudog的获取和归还,遵循以下策略:
1、获取,首先从per-P缓存获取,对于per-P缓存,如果per-P缓存为空,则从全局池抓取一半,然后取出per-P缓存中的最后一个;
2、归还,归还到per-P缓存,如果per-P缓存满了,就把per-P缓存的一半归还到全局缓存中,然后归还sudog到per-P缓存中。
acquireSudog
1、如果per-P缓存的内容没达到长度的一般,则会从全局额缓存中抓取一半;
2、然后返回把per-P缓存中最后一个sudog返回,并且置空;
// go/src/runtime/proc.go
//go:nosplit
func acquireSudog() *sudog {
// Delicate dance: 信号量的实现调用acquireSudog,然后acquireSudog调用new(sudog)
// new调用malloc, malloc调用垃圾收集器,垃圾收集器在stopTheWorld调用信号量
// 通过在new(sudog)周围执行acquirem/releasem来打破循环
// acquirem/releasem在new(sudog)期间增加m.locks,防止垃圾收集器被调用。
// 获取当前 g 所在的 m
mp := acquirem()
// 获取p的指针
pp := mp.p.ptr()
if len(pp.sudogcache) == 0 {
lock(&sched.sudoglock)
// 首先,尝试从中央缓存获取一批数据。
for len(pp.sudogcache)
<h4>releaseSudog</h4>
<p>1、如果<code>per-P</code>缓存满了,就归还<code>per-P</code>缓存一般的内容到全局缓存;</p>
<p>2、然后将回收的<code>sudog</code>放到<code>per-P</code>缓存中。</p>
<pre class="brush:plain;">
// go/src/runtime/proc.go
//go:nosplit
func releaseSudog(s *sudog) {
if s.elem != nil {
throw("runtime: sudog with non-nil elem")
}
if s.isSelect {
throw("runtime: sudog with non-false isSelect")
}
if s.next != nil {
throw("runtime: sudog with non-nil next")
}
if s.prev != nil {
throw("runtime: sudog with non-nil prev")
}
if s.waitlink != nil {
throw("runtime: sudog with non-nil waitlink")
}
if s.c != nil {
throw("runtime: sudog with non-nil c")
}
gp := getg()
if gp.param != nil {
throw("runtime: releaseSudog with non-nil gp.param")
}
// 避免重新安排到另一个P
mp := acquirem() // avoid rescheduling to another P
pp := mp.p.ptr()
// 如果缓存满了
if len(pp.sudogcache) == cap(pp.sudogcache) {
// 将本地高速缓存的一半传输到中央高速缓存
var first, last *sudog
for len(pp.sudogcache) > cap(pp.sudogcache)/2 {
n := len(pp.sudogcache)
p := pp.sudogcache[n-1]
pp.sudogcache[n-1] = nil
pp.sudogcache = pp.sudogcache[:n-1]
if first == nil {
first = p
} else {
last.next = p
}
last = p
}
lock(&sched.sudoglock)
last.next = sched.sudogcache
sched.sudogcache = first
unlock(&sched.sudoglock)
}
// 归还sudog到`per-P`缓存中
pp.sudogcache = append(pp.sudogcache, s)
releasem(mp)
}
semaphore
// go/src/runtime/sema.go
// 用于sync.Mutex的异步信号量。
// semaRoot拥有一个具有不同地址(s.elem)的sudog平衡树。
// 每个sudog都可以依次(通过s.waitlink)指向一个列表,在相同地址上等待的其他sudog。
// 对具有相同地址的sudog内部列表进行的操作全部为O(1)。顶层semaRoot列表的扫描为O(log n),
// 其中,n是阻止goroutines的不同地址的数量,通过他们散列到给定的semaRoot。
type semaRoot struct {
lock mutex
// waiters的平衡树的根节点
treap *sudog
// waiters的数量,读取的时候无所
nwait uint32
}
// Prime to not correlate with any user patterns.
const semTabSize = 251
var semtable [semTabSize]struct {
root semaRoot
pad [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte
}
poll_runtime_Semacquire/sync_runtime_SemacquireMutex
// go/src/runtime/sema.go
//go:linkname poll_runtime_Semacquire internal/poll.runtime_Semacquire
func poll_runtime_Semacquire(addr *uint32) {
semacquire1(addr, false, semaBlockProfile, 0)
}
//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {
semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes)
}
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) {
// 判断这个goroutine,是否是m上正在运行的那个
gp := getg()
if gp != gp.m.curg {
throw("semacquire not on the G stack")
}
// *addr -= 1
if cansemacquire(addr) {
return
}
// 增加等待计数
// 再试一次 cansemacquire 如果成功则直接返回
// 将自己作为等待者入队
// 休眠
// (等待器描述符由出队信号产生出队行为)
// 获取一个sudog
s := acquireSudog()
root := semroot(addr)
t0 := int64(0)
s.releasetime = 0
s.acquiretime = 0
s.ticket = 0
if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
t0 = cputicks()
s.releasetime = -1
}
if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {
if t0 == 0 {
t0 = cputicks()
}
s.acquiretime = t0
}
for {
lock(&root.lock)
// 添加我们自己到nwait来禁用semrelease中的"easy case"
atomic.Xadd(&root.nwait, 1)
// 检查cansemacquire避免错过唤醒
if cansemacquire(addr) {
atomic.Xadd(&root.nwait, -1)
unlock(&root.lock)
break
}
// 任何在 cansemacquire 之后的 semrelease 都知道我们在等待(因为设置了 nwait),因此休眠
// 队列将s添加到semaRoot中被阻止的goroutine中
root.queue(addr, s, lifo)
// 将当前goroutine置于等待状态并解锁锁。
// 通过调用goready(gp),可以使goroutine再次可运行。
goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)
if s.ticket != 0 || cansemacquire(addr) {
break
}
}
if s.releasetime > 0 {
blockevent(s.releasetime-t0, 3+skipframes)
}
// 归还sudog
releaseSudog(s)
}
func cansemacquire(addr *uint32) bool {
for {
v := atomic.Load(addr)
if v == 0 {
return false
}
if atomic.Cas(addr, v, v-1) {
return true
}
}
}
sync_runtime_Semrelease
// go/src/runtime/sema.go
//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) {
semrelease1(addr, handoff, skipframes)
}
func semrelease1(addr *uint32, handoff bool, skipframes int) {
root := semroot(addr)
atomic.Xadd(addr, 1)
// Easy case:没有等待者
// 这个检查必须发生在xadd之后,以避免错过唤醒
if atomic.Load(&root.nwait) == 0 {
return
}
// Harder case: 找到等待者,并且唤醒
lock(&root.lock)
if atomic.Load(&root.nwait) == 0 {
// 该计数已被另一个goroutine占用,
// 因此无需唤醒其他goroutine。
unlock(&root.lock)
return
}
// 搜索一个等待着然后将其唤醒
s, t0 := root.dequeue(addr)
if s != nil {
atomic.Xadd(&root.nwait, -1)
}
unlock(&root.lock)
if s != nil { // 可能会很慢,因此先解锁
acquiretime := s.acquiretime
if acquiretime != 0 {
mutexevent(t0-acquiretime, 3+skipframes)
}
if s.ticket != 0 {
throw("corrupted semaphore ticket")
}
if handoff && cansemacquire(addr) {
s.ticket = 1
}
// goready(s.g, 5)
// 标记 runnable,等待被重新调度
readyWithTime(s, 5+skipframes)
}
}
摘自"同步原语"的一段总结
这一对 semacquire 和 semrelease 理解上可能不太直观。 首先,我们必须意识到这两个函数一定是在两个不同的 M(线程)上得到执行,否则不会出现并发,我们不妨设为 M1 和 M2。 当 M1 上的 G1 执行到 semacquire1 时,如果快速路径成功,则说明 G1 抢到锁,能够继续执行。但一旦失败且在慢速路径下 依然抢不到锁,则会进入 goparkunlock,将当前的 G1 放到等待队列中,进而让 M1 切换并执行其他 G。 当 M2 上的 G2 开始调用 semrelease1 时,只是单纯的将等待队列的 G1 重新放到调度队列中,而当 G1 重新被调度时(假设运气好又在 M1 上被调度),代码仍然会从 goparkunlock 之后开始执行,并再次尝试竞争信号量,如果成功,则会归还 sudog。
参考
【同步原语】https://golang.design/under-the-hood/zh-cn/part2runtime/ch06sched/sync/
【Go并发编程实战--信号量的使用方法和其实现原理】https://juejin.cn/post/6906677772479889422
【Semaphore】https://github.com/cch123/golang-notes/blob/master/semaphore.md
【进程同步之信号量机制(pv操作)及三个经典同步问题】https://blog.csdn.net/SpeedMe/article/details/17597373
理论要掌握,实操不能落!以上关于《一文读懂go中semaphore(信号量)源码》的详细介绍,大家都掌握了吧!如果想要继续提升自己的能力,那么就来关注golang学习网公众号吧!
jenkins构建go及java项目的方法
- 上一篇
- jenkins构建go及java项目的方法
- 下一篇
- Go各时间字符串使用解析
-
- Golang · Go教程 | 5小时前 | 格式化输出 printf fmt库 格式化动词 Stringer接口
- Golangfmt库用法与格式化技巧解析
- 140浏览 收藏
-
- Golang · Go教程 | 5小时前 |
- Golang配置Protobuf安装教程
- 147浏览 收藏
-
- Golang · Go教程 | 5小时前 |
- Golang中介者模式实现与通信解耦技巧
- 378浏览 收藏
-
- Golang · Go教程 | 5小时前 |
- Golang多协程通信技巧分享
- 255浏览 收藏
-
- Golang · Go教程 | 6小时前 |
- Golang如何判断变量类型?
- 393浏览 收藏
-
- Golang · Go教程 | 6小时前 |
- Golang云原生微服务实战教程
- 310浏览 收藏
-
- Golang · Go教程 | 6小时前 |
- Golang迭代器与懒加载结合应用
- 110浏览 收藏
-
- Golang · Go教程 | 7小时前 | 性能优化 并发安全 Golangslicemap 预设容量 指针拷贝
- Golangslicemap优化技巧分享
- 412浏览 收藏
-
- Golang · Go教程 | 7小时前 |
- Golang代理模式与访问控制实现解析
- 423浏览 收藏
-
- Golang · Go教程 | 7小时前 |
- Golang事件管理模块实现教程
- 274浏览 收藏
-
- 前端进阶之JavaScript设计模式
- 设计模式是开发人员在软件开发过程中面临一般问题时的解决方案,代表了最佳的实践。本课程的主打内容包括JS常见设计模式以及具体应用场景,打造一站式知识长龙服务,适合有JS基础的同学学习。
- 543次学习
-
- GO语言核心编程课程
- 本课程采用真实案例,全面具体可落地,从理论到实践,一步一步将GO核心编程技术、编程思想、底层实现融会贯通,使学习者贴近时代脉搏,做IT互联网时代的弄潮儿。
- 516次学习
-
- 简单聊聊mysql8与网络通信
- 如有问题加微信:Le-studyg;在课程中,我们将首先介绍MySQL8的新特性,包括性能优化、安全增强、新数据类型等,帮助学生快速熟悉MySQL8的最新功能。接着,我们将深入解析MySQL的网络通信机制,包括协议、连接管理、数据传输等,让
- 500次学习
-
- JavaScript正则表达式基础与实战
- 在任何一门编程语言中,正则表达式,都是一项重要的知识,它提供了高效的字符串匹配与捕获机制,可以极大的简化程序设计。
- 487次学习
-
- 从零制作响应式网站—Grid布局
- 本系列教程将展示从零制作一个假想的网络科技公司官网,分为导航,轮播,关于我们,成功案例,服务流程,团队介绍,数据部分,公司动态,底部信息等内容区块。网站整体采用CSSGrid布局,支持响应式,有流畅过渡和展现动画。
- 485次学习
-
- ChatExcel酷表
- ChatExcel酷表是由北京大学团队打造的Excel聊天机器人,用自然语言操控表格,简化数据处理,告别繁琐操作,提升工作效率!适用于学生、上班族及政府人员。
- 3166次使用
-
- Any绘本
- 探索Any绘本(anypicturebook.com/zh),一款开源免费的AI绘本创作工具,基于Google Gemini与Flux AI模型,让您轻松创作个性化绘本。适用于家庭、教育、创作等多种场景,零门槛,高自由度,技术透明,本地可控。
- 3378次使用
-
- 可赞AI
- 可赞AI,AI驱动的办公可视化智能工具,助您轻松实现文本与可视化元素高效转化。无论是智能文档生成、多格式文本解析,还是一键生成专业图表、脑图、知识卡片,可赞AI都能让信息处理更清晰高效。覆盖数据汇报、会议纪要、内容营销等全场景,大幅提升办公效率,降低专业门槛,是您提升工作效率的得力助手。
- 3407次使用
-
- 星月写作
- 星月写作是国内首款聚焦中文网络小说创作的AI辅助工具,解决网文作者从构思到变现的全流程痛点。AI扫榜、专属模板、全链路适配,助力新人快速上手,资深作者效率倍增。
- 4511次使用
-
- MagicLight
- MagicLight.ai是全球首款叙事驱动型AI动画视频创作平台,专注于解决从故事想法到完整动画的全流程痛点。它通过自研AI模型,保障角色、风格、场景高度一致性,让零动画经验者也能高效产出专业级叙事内容。广泛适用于独立创作者、动画工作室、教育机构及企业营销,助您轻松实现创意落地与商业化。
- 3787次使用
-
- Redis源码与设计剖析之网络连接库
- 2023-01-07 289浏览
-
- Go语言io pipe源码分析详情
- 2023-01-07 237浏览
-
- 对Go语言中的context包源码分析
- 2023-02-16 270浏览
-
- Go语言context test源码分析详情
- 2023-02-24 472浏览
-
- 向优秀代码学习:Redis 代码库源码概览
- 2023-01-29 448浏览

